##// END OF EJS Templates
elasticsearch: move to single doctype indices
ergo -
Show More
@@ -1,707 +1,707 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors
3 # Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors
4 #
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
7 # You may obtain a copy of the License at
8 #
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
10 #
11 # Unless required by applicable law or agreed to in writing, software
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
15 # limitations under the License.
16
16
17 import bisect
17 import bisect
18 import collections
18 import collections
19 import math
19 import math
20 from datetime import datetime, timedelta
20 from datetime import datetime, timedelta
21
21
22 import sqlalchemy as sa
22 import sqlalchemy as sa
23 import elasticsearch.exceptions
23 import elasticsearch.exceptions
24 import elasticsearch.helpers
24 import elasticsearch.helpers
25
25
26 from celery.utils.log import get_task_logger
26 from celery.utils.log import get_task_logger
27 from zope.sqlalchemy import mark_changed
27 from zope.sqlalchemy import mark_changed
28 from pyramid.threadlocal import get_current_request, get_current_registry
28 from pyramid.threadlocal import get_current_request, get_current_registry
29 from ziggurat_foundations.models.services.resource import ResourceService
29 from ziggurat_foundations.models.services.resource import ResourceService
30
30
31 from appenlight.celery import celery
31 from appenlight.celery import celery
32 from appenlight.models.report_group import ReportGroup
32 from appenlight.models.report_group import ReportGroup
33 from appenlight.models import DBSession, Datastores
33 from appenlight.models import DBSession, Datastores
34 from appenlight.models.report import Report
34 from appenlight.models.report import Report
35 from appenlight.models.log import Log
35 from appenlight.models.log import Log
36 from appenlight.models.metric import Metric
36 from appenlight.models.metric import Metric
37 from appenlight.models.event import Event
37 from appenlight.models.event import Event
38
38
39 from appenlight.models.services.application import ApplicationService
39 from appenlight.models.services.application import ApplicationService
40 from appenlight.models.services.event import EventService
40 from appenlight.models.services.event import EventService
41 from appenlight.models.services.log import LogService
41 from appenlight.models.services.log import LogService
42 from appenlight.models.services.report import ReportService
42 from appenlight.models.services.report import ReportService
43 from appenlight.models.services.report_group import ReportGroupService
43 from appenlight.models.services.report_group import ReportGroupService
44 from appenlight.models.services.user import UserService
44 from appenlight.models.services.user import UserService
45 from appenlight.models.tag import Tag
45 from appenlight.models.tag import Tag
46 from appenlight.lib import print_traceback
46 from appenlight.lib import print_traceback
47 from appenlight.lib.utils import parse_proto, in_batches
47 from appenlight.lib.utils import parse_proto, in_batches
48 from appenlight.lib.ext_json import json
48 from appenlight.lib.ext_json import json
49 from appenlight.lib.redis_keys import REDIS_KEYS
49 from appenlight.lib.redis_keys import REDIS_KEYS
50 from appenlight.lib.enums import ReportType
50 from appenlight.lib.enums import ReportType
51
51
52 log = get_task_logger(__name__)
52 log = get_task_logger(__name__)
53
53
54 sample_boundries = (
54 sample_boundries = (
55 list(range(100, 1000, 100))
55 list(range(100, 1000, 100))
56 + list(range(1000, 10000, 1000))
56 + list(range(1000, 10000, 1000))
57 + list(range(10000, 100000, 5000))
57 + list(range(10000, 100000, 5000))
58 )
58 )
59
59
60
60
61 def pick_sample(total_occurences, report_type=None):
61 def pick_sample(total_occurences, report_type=None):
62 every = 1.0
62 every = 1.0
63 position = bisect.bisect_left(sample_boundries, total_occurences)
63 position = bisect.bisect_left(sample_boundries, total_occurences)
64 if position > 0:
64 if position > 0:
65 if report_type == ReportType.not_found:
65 if report_type == ReportType.not_found:
66 divide = 10.0
66 divide = 10.0
67 else:
67 else:
68 divide = 100.0
68 divide = 100.0
69 every = sample_boundries[position - 1] / divide
69 every = sample_boundries[position - 1] / divide
70 return total_occurences % every == 0
70 return total_occurences % every == 0
71
71
72
72
73 @celery.task(queue="default", default_retry_delay=1, max_retries=2)
73 @celery.task(queue="default", default_retry_delay=1, max_retries=2)
74 def test_exception_task():
74 def test_exception_task():
75 log.error("test celery log", extra={"location": "celery"})
75 log.error("test celery log", extra={"location": "celery"})
76 log.warning("test celery log", extra={"location": "celery"})
76 log.warning("test celery log", extra={"location": "celery"})
77 raise Exception("Celery exception test")
77 raise Exception("Celery exception test")
78
78
79
79
80 @celery.task(queue="default", default_retry_delay=1, max_retries=2)
80 @celery.task(queue="default", default_retry_delay=1, max_retries=2)
81 def test_retry_exception_task():
81 def test_retry_exception_task():
82 try:
82 try:
83 import time
83 import time
84
84
85 time.sleep(1.3)
85 time.sleep(1.3)
86 log.error("test retry celery log", extra={"location": "celery"})
86 log.error("test retry celery log", extra={"location": "celery"})
87 log.warning("test retry celery log", extra={"location": "celery"})
87 log.warning("test retry celery log", extra={"location": "celery"})
88 raise Exception("Celery exception test")
88 raise Exception("Celery exception test")
89 except Exception as exc:
89 except Exception as exc:
90 if celery.conf["CELERY_EAGER_PROPAGATES_EXCEPTIONS"]:
90 if celery.conf["CELERY_EAGER_PROPAGATES_EXCEPTIONS"]:
91 raise
91 raise
92 test_retry_exception_task.retry(exc=exc)
92 test_retry_exception_task.retry(exc=exc)
93
93
94
94
95 @celery.task(queue="reports", default_retry_delay=600, max_retries=144)
95 @celery.task(queue="reports", default_retry_delay=600, max_retries=144)
96 def add_reports(resource_id, request_params, dataset, **kwargs):
96 def add_reports(resource_id, request_params, dataset, **kwargs):
97 proto_version = parse_proto(request_params.get("protocol_version", ""))
97 proto_version = parse_proto(request_params.get("protocol_version", ""))
98 current_time = datetime.utcnow().replace(second=0, microsecond=0)
98 current_time = datetime.utcnow().replace(second=0, microsecond=0)
99 try:
99 try:
100 # we will store solr docs here for single insert
100 # we will store solr docs here for single insert
101 es_report_docs = {}
101 es_report_docs = {}
102 es_report_group_docs = {}
102 es_report_group_docs = {}
103 resource = ApplicationService.by_id(resource_id)
103 resource = ApplicationService.by_id(resource_id)
104
104
105 tags = []
105 tags = []
106 es_slow_calls_docs = {}
106 es_slow_calls_docs = {}
107 es_reports_stats_rows = {}
107 es_reports_stats_rows = {}
108 for report_data in dataset:
108 for report_data in dataset:
109 # build report details for later
109 # build report details for later
110 added_details = 0
110 added_details = 0
111 report = Report()
111 report = Report()
112 report.set_data(report_data, resource, proto_version)
112 report.set_data(report_data, resource, proto_version)
113 report._skip_ft_index = True
113 report._skip_ft_index = True
114
114
115 # find latest group in this months partition
115 # find latest group in this months partition
116 report_group = ReportGroupService.by_hash_and_resource(
116 report_group = ReportGroupService.by_hash_and_resource(
117 report.resource_id,
117 report.resource_id,
118 report.grouping_hash,
118 report.grouping_hash,
119 since_when=datetime.utcnow().date().replace(day=1),
119 since_when=datetime.utcnow().date().replace(day=1),
120 )
120 )
121 occurences = report_data.get("occurences", 1)
121 occurences = report_data.get("occurences", 1)
122 if not report_group:
122 if not report_group:
123 # total reports will be +1 moment later
123 # total reports will be +1 moment later
124 report_group = ReportGroup(
124 report_group = ReportGroup(
125 grouping_hash=report.grouping_hash,
125 grouping_hash=report.grouping_hash,
126 occurences=0,
126 occurences=0,
127 total_reports=0,
127 total_reports=0,
128 last_report=0,
128 last_report=0,
129 priority=report.priority,
129 priority=report.priority,
130 error=report.error,
130 error=report.error,
131 first_timestamp=report.start_time,
131 first_timestamp=report.start_time,
132 )
132 )
133 report_group._skip_ft_index = True
133 report_group._skip_ft_index = True
134 report_group.report_type = report.report_type
134 report_group.report_type = report.report_type
135 report.report_group_time = report_group.first_timestamp
135 report.report_group_time = report_group.first_timestamp
136 add_sample = pick_sample(
136 add_sample = pick_sample(
137 report_group.occurences, report_type=report_group.report_type
137 report_group.occurences, report_type=report_group.report_type
138 )
138 )
139 if add_sample:
139 if add_sample:
140 resource.report_groups.append(report_group)
140 resource.report_groups.append(report_group)
141 report_group.reports.append(report)
141 report_group.reports.append(report)
142 added_details += 1
142 added_details += 1
143 DBSession.flush()
143 DBSession.flush()
144 if report.partition_id not in es_report_docs:
144 if report.partition_id not in es_report_docs:
145 es_report_docs[report.partition_id] = []
145 es_report_docs[report.partition_id] = []
146 es_report_docs[report.partition_id].append(report.es_doc())
146 es_report_docs[report.partition_id].append(report.es_doc())
147 tags.extend(list(report.tags.items()))
147 tags.extend(list(report.tags.items()))
148 slow_calls = report.add_slow_calls(report_data, report_group)
148 slow_calls = report.add_slow_calls(report_data, report_group)
149 DBSession.flush()
149 DBSession.flush()
150 for s_call in slow_calls:
150 for s_call in slow_calls:
151 if s_call.partition_id not in es_slow_calls_docs:
151 if s_call.partition_id not in es_slow_calls_docs:
152 es_slow_calls_docs[s_call.partition_id] = []
152 es_slow_calls_docs[s_call.partition_id] = []
153 es_slow_calls_docs[s_call.partition_id].append(s_call.es_doc())
153 es_slow_calls_docs[s_call.partition_id].append(s_call.es_doc())
154 # try generating new stat rows if needed
154 # try generating new stat rows if needed
155 else:
155 else:
156 # required for postprocessing to not fail later
156 # required for postprocessing to not fail later
157 report.report_group = report_group
157 report.report_group = report_group
158
158
159 stat_row = ReportService.generate_stat_rows(report, resource, report_group)
159 stat_row = ReportService.generate_stat_rows(report, resource, report_group)
160 if stat_row.partition_id not in es_reports_stats_rows:
160 if stat_row.partition_id not in es_reports_stats_rows:
161 es_reports_stats_rows[stat_row.partition_id] = []
161 es_reports_stats_rows[stat_row.partition_id] = []
162 es_reports_stats_rows[stat_row.partition_id].append(stat_row.es_doc())
162 es_reports_stats_rows[stat_row.partition_id].append(stat_row.es_doc())
163
163
164 # see if we should mark 10th occurence of report
164 # see if we should mark 10th occurence of report
165 last_occurences_10 = int(math.floor(report_group.occurences / 10))
165 last_occurences_10 = int(math.floor(report_group.occurences / 10))
166 curr_occurences_10 = int(
166 curr_occurences_10 = int(
167 math.floor((report_group.occurences + report.occurences) / 10)
167 math.floor((report_group.occurences + report.occurences) / 10)
168 )
168 )
169 last_occurences_100 = int(math.floor(report_group.occurences / 100))
169 last_occurences_100 = int(math.floor(report_group.occurences / 100))
170 curr_occurences_100 = int(
170 curr_occurences_100 = int(
171 math.floor((report_group.occurences + report.occurences) / 100)
171 math.floor((report_group.occurences + report.occurences) / 100)
172 )
172 )
173 notify_occurences_10 = last_occurences_10 != curr_occurences_10
173 notify_occurences_10 = last_occurences_10 != curr_occurences_10
174 notify_occurences_100 = last_occurences_100 != curr_occurences_100
174 notify_occurences_100 = last_occurences_100 != curr_occurences_100
175 report_group.occurences = ReportGroup.occurences + occurences
175 report_group.occurences = ReportGroup.occurences + occurences
176 report_group.last_timestamp = report.start_time
176 report_group.last_timestamp = report.start_time
177 report_group.summed_duration = ReportGroup.summed_duration + report.duration
177 report_group.summed_duration = ReportGroup.summed_duration + report.duration
178 summed_duration = ReportGroup.summed_duration + report.duration
178 summed_duration = ReportGroup.summed_duration + report.duration
179 summed_occurences = ReportGroup.occurences + occurences
179 summed_occurences = ReportGroup.occurences + occurences
180 report_group.average_duration = summed_duration / summed_occurences
180 report_group.average_duration = summed_duration / summed_occurences
181 report_group.run_postprocessing(report)
181 report_group.run_postprocessing(report)
182 if added_details:
182 if added_details:
183 report_group.total_reports = ReportGroup.total_reports + 1
183 report_group.total_reports = ReportGroup.total_reports + 1
184 report_group.last_report = report.id
184 report_group.last_report = report.id
185 report_group.set_notification_info(
185 report_group.set_notification_info(
186 notify_10=notify_occurences_10, notify_100=notify_occurences_100
186 notify_10=notify_occurences_10, notify_100=notify_occurences_100
187 )
187 )
188 DBSession.flush()
188 DBSession.flush()
189 report_group.get_report().notify_channel(report_group)
189 report_group.get_report().notify_channel(report_group)
190 if report_group.partition_id not in es_report_group_docs:
190 if report_group.partition_id not in es_report_group_docs:
191 es_report_group_docs[report_group.partition_id] = []
191 es_report_group_docs[report_group.partition_id] = []
192 es_report_group_docs[report_group.partition_id].append(
192 es_report_group_docs[report_group.partition_id].append(
193 report_group.es_doc()
193 report_group.es_doc()
194 )
194 )
195
195
196 action = "REPORT"
196 action = "REPORT"
197 log_msg = "%s: %s %s, client: %s, proto: %s" % (
197 log_msg = "%s: %s %s, client: %s, proto: %s" % (
198 action,
198 action,
199 report_data.get("http_status", "unknown"),
199 report_data.get("http_status", "unknown"),
200 str(resource),
200 str(resource),
201 report_data.get("client"),
201 report_data.get("client"),
202 proto_version,
202 proto_version,
203 )
203 )
204 log.info(log_msg)
204 log.info(log_msg)
205 total_reports = len(dataset)
205 total_reports = len(dataset)
206 redis_pipeline = Datastores.redis.pipeline(transaction=False)
206 redis_pipeline = Datastores.redis.pipeline(transaction=False)
207 key = REDIS_KEYS["counters"]["reports_per_minute"].format(current_time)
207 key = REDIS_KEYS["counters"]["reports_per_minute"].format(current_time)
208 redis_pipeline.incr(key, total_reports)
208 redis_pipeline.incr(key, total_reports)
209 redis_pipeline.expire(key, 3600 * 24)
209 redis_pipeline.expire(key, 3600 * 24)
210 key = REDIS_KEYS["counters"]["events_per_minute_per_user"].format(
210 key = REDIS_KEYS["counters"]["events_per_minute_per_user"].format(
211 resource.owner_user_id, current_time
211 resource.owner_user_id, current_time
212 )
212 )
213 redis_pipeline.incr(key, total_reports)
213 redis_pipeline.incr(key, total_reports)
214 redis_pipeline.expire(key, 3600)
214 redis_pipeline.expire(key, 3600)
215 key = REDIS_KEYS["counters"]["reports_per_hour_per_app"].format(
215 key = REDIS_KEYS["counters"]["reports_per_hour_per_app"].format(
216 resource_id, current_time.replace(minute=0)
216 resource_id, current_time.replace(minute=0)
217 )
217 )
218 redis_pipeline.incr(key, total_reports)
218 redis_pipeline.incr(key, total_reports)
219 redis_pipeline.expire(key, 3600 * 24 * 7)
219 redis_pipeline.expire(key, 3600 * 24 * 7)
220 redis_pipeline.sadd(
220 redis_pipeline.sadd(
221 REDIS_KEYS["apps_that_got_new_data_per_hour"].format(
221 REDIS_KEYS["apps_that_got_new_data_per_hour"].format(
222 current_time.replace(minute=0)
222 current_time.replace(minute=0)
223 ),
223 ),
224 resource_id,
224 resource_id,
225 )
225 )
226 redis_pipeline.execute()
226 redis_pipeline.execute()
227
227
228 add_reports_es(es_report_group_docs, es_report_docs)
228 add_reports_es(es_report_group_docs, es_report_docs)
229 add_reports_slow_calls_es(es_slow_calls_docs)
229 add_reports_slow_calls_es(es_slow_calls_docs)
230 add_reports_stats_rows_es(es_reports_stats_rows)
230 add_reports_stats_rows_es(es_reports_stats_rows)
231 return True
231 return True
232 except Exception as exc:
232 except Exception as exc:
233 print_traceback(log)
233 print_traceback(log)
234 if celery.conf["CELERY_EAGER_PROPAGATES_EXCEPTIONS"]:
234 if celery.conf["CELERY_EAGER_PROPAGATES_EXCEPTIONS"]:
235 raise
235 raise
236 add_reports.retry(exc=exc)
236 add_reports.retry(exc=exc)
237
237
238
238
239 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
239 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
240 def add_reports_es(report_group_docs, report_docs):
240 def add_reports_es(report_group_docs, report_docs):
241 for k, v in report_group_docs.items():
241 for k, v in report_group_docs.items():
242 to_update = {"_index": k, "_type": "report_group"}
242 to_update = {"_index": k, "_type": "report"}
243 [i.update(to_update) for i in v]
243 [i.update(to_update) for i in v]
244 elasticsearch.helpers.bulk(Datastores.es, v)
244 elasticsearch.helpers.bulk(Datastores.es, v)
245 for k, v in report_docs.items():
245 for k, v in report_docs.items():
246 to_update = {"_index": k, "_type": "report"}
246 to_update = {"_index": k, "_type": "report"}
247 [i.update(to_update) for i in v]
247 [i.update(to_update) for i in v]
248 elasticsearch.helpers.bulk(Datastores.es, v)
248 elasticsearch.helpers.bulk(Datastores.es, v)
249
249
250
250
251 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
251 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
252 def add_reports_slow_calls_es(es_docs):
252 def add_reports_slow_calls_es(es_docs):
253 for k, v in es_docs.items():
253 for k, v in es_docs.items():
254 to_update = {"_index": k, "_type": "log"}
254 to_update = {"_index": k, "_type": "log"}
255 [i.update(to_update) for i in v]
255 [i.update(to_update) for i in v]
256 elasticsearch.helpers.bulk(Datastores.es, v)
256 elasticsearch.helpers.bulk(Datastores.es, v)
257
257
258
258
259 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
259 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
260 def add_reports_stats_rows_es(es_docs):
260 def add_reports_stats_rows_es(es_docs):
261 for k, v in es_docs.items():
261 for k, v in es_docs.items():
262 to_update = {"_index": k, "_type": "log"}
262 to_update = {"_index": k, "_type": "report"}
263 [i.update(to_update) for i in v]
263 [i.update(to_update) for i in v]
264 elasticsearch.helpers.bulk(Datastores.es, v)
264 elasticsearch.helpers.bulk(Datastores.es, v)
265
265
266
266
267 @celery.task(queue="logs", default_retry_delay=600, max_retries=144)
267 @celery.task(queue="logs", default_retry_delay=600, max_retries=144)
268 def add_logs(resource_id, request_params, dataset, **kwargs):
268 def add_logs(resource_id, request_params, dataset, **kwargs):
269 proto_version = request_params.get("protocol_version")
269 proto_version = request_params.get("protocol_version")
270 current_time = datetime.utcnow().replace(second=0, microsecond=0)
270 current_time = datetime.utcnow().replace(second=0, microsecond=0)
271
271
272 try:
272 try:
273 es_docs = collections.defaultdict(list)
273 es_docs = collections.defaultdict(list)
274 resource = ApplicationService.by_id_cached()(resource_id)
274 resource = ApplicationService.by_id_cached()(resource_id)
275 resource = DBSession.merge(resource, load=False)
275 resource = DBSession.merge(resource, load=False)
276 ns_pairs = []
276 ns_pairs = []
277 for entry in dataset:
277 for entry in dataset:
278 # gather pk and ns so we can remove older versions of row later
278 # gather pk and ns so we can remove older versions of row later
279 if entry["primary_key"] is not None:
279 if entry["primary_key"] is not None:
280 ns_pairs.append({"pk": entry["primary_key"], "ns": entry["namespace"]})
280 ns_pairs.append({"pk": entry["primary_key"], "ns": entry["namespace"]})
281 log_entry = Log()
281 log_entry = Log()
282 log_entry.set_data(entry, resource=resource)
282 log_entry.set_data(entry, resource=resource)
283 log_entry._skip_ft_index = True
283 log_entry._skip_ft_index = True
284 resource.logs.append(log_entry)
284 resource.logs.append(log_entry)
285 DBSession.flush()
285 DBSession.flush()
286 # insert non pk rows first
286 # insert non pk rows first
287 if entry["primary_key"] is None:
287 if entry["primary_key"] is None:
288 es_docs[log_entry.partition_id].append(log_entry.es_doc())
288 es_docs[log_entry.partition_id].append(log_entry.es_doc())
289
289
290 # 2nd pass to delete all log entries from db foe same pk/ns pair
290 # 2nd pass to delete all log entries from db for same pk/ns pair
291 if ns_pairs:
291 if ns_pairs:
292 ids_to_delete = []
292 ids_to_delete = []
293 es_docs = collections.defaultdict(list)
293 es_docs = collections.defaultdict(list)
294 es_docs_to_delete = collections.defaultdict(list)
294 es_docs_to_delete = collections.defaultdict(list)
295 found_pkey_logs = LogService.query_by_primary_key_and_namespace(
295 found_pkey_logs = LogService.query_by_primary_key_and_namespace(
296 list_of_pairs=ns_pairs
296 list_of_pairs=ns_pairs
297 )
297 )
298 log_dict = {}
298 log_dict = {}
299 for log_entry in found_pkey_logs:
299 for log_entry in found_pkey_logs:
300 log_key = (log_entry.primary_key, log_entry.namespace)
300 log_key = (log_entry.primary_key, log_entry.namespace)
301 if log_key not in log_dict:
301 if log_key not in log_dict:
302 log_dict[log_key] = []
302 log_dict[log_key] = []
303 log_dict[log_key].append(log_entry)
303 log_dict[log_key].append(log_entry)
304
304
305 for ns, entry_list in log_dict.items():
305 for ns, entry_list in log_dict.items():
306 entry_list = sorted(entry_list, key=lambda x: x.timestamp)
306 entry_list = sorted(entry_list, key=lambda x: x.timestamp)
307 # newest row needs to be indexed in es
307 # newest row needs to be indexed in es
308 log_entry = entry_list[-1]
308 log_entry = entry_list[-1]
309 # delete everything from pg and ES, leave the last row in pg
309 # delete everything from pg and ES, leave the last row in pg
310 for e in entry_list[:-1]:
310 for e in entry_list[:-1]:
311 ids_to_delete.append(e.log_id)
311 ids_to_delete.append(e.log_id)
312 es_docs_to_delete[e.partition_id].append(e.delete_hash)
312 es_docs_to_delete[e.partition_id].append(e.delete_hash)
313
313
314 es_docs_to_delete[log_entry.partition_id].append(log_entry.delete_hash)
314 es_docs_to_delete[log_entry.partition_id].append(log_entry.delete_hash)
315
315
316 es_docs[log_entry.partition_id].append(log_entry.es_doc())
316 es_docs[log_entry.partition_id].append(log_entry.es_doc())
317
317
318 if ids_to_delete:
318 if ids_to_delete:
319 query = DBSession.query(Log).filter(Log.log_id.in_(ids_to_delete))
319 query = DBSession.query(Log).filter(Log.log_id.in_(ids_to_delete))
320 query.delete(synchronize_session=False)
320 query.delete(synchronize_session=False)
321 if es_docs_to_delete:
321 if es_docs_to_delete:
322 # batch this to avoid problems with default ES bulk limits
322 # batch this to avoid problems with default ES bulk limits
323 for es_index in es_docs_to_delete.keys():
323 for es_index in es_docs_to_delete.keys():
324 for batch in in_batches(es_docs_to_delete[es_index], 20):
324 for batch in in_batches(es_docs_to_delete[es_index], 20):
325 query = {"query": {"terms": {"delete_hash": batch}}}
325 query = {"query": {"terms": {"delete_hash": batch}}}
326
326
327 try:
327 try:
328 Datastores.es.delete_by_query(
328 Datastores.es.delete_by_query(
329 index=es_index, doc_type="log",
329 index=es_index, doc_type="log",
330 body=query, conflicts="proceed"
330 body=query, conflicts="proceed"
331 )
331 )
332 except elasticsearch.exceptions.NotFoundError as exc:
332 except elasticsearch.exceptions.NotFoundError as exc:
333 msg = "skipping index {}".format(es_index)
333 msg = "skipping index {}".format(es_index)
334 log.info(msg)
334 log.info(msg)
335
335
336 total_logs = len(dataset)
336 total_logs = len(dataset)
337
337
338 log_msg = "LOG_NEW: %s, entries: %s, proto:%s" % (
338 log_msg = "LOG_NEW: %s, entries: %s, proto:%s" % (
339 str(resource),
339 str(resource),
340 total_logs,
340 total_logs,
341 proto_version,
341 proto_version,
342 )
342 )
343 log.info(log_msg)
343 log.info(log_msg)
344 # mark_changed(session)
344 # mark_changed(session)
345 redis_pipeline = Datastores.redis.pipeline(transaction=False)
345 redis_pipeline = Datastores.redis.pipeline(transaction=False)
346 key = REDIS_KEYS["counters"]["logs_per_minute"].format(current_time)
346 key = REDIS_KEYS["counters"]["logs_per_minute"].format(current_time)
347 redis_pipeline.incr(key, total_logs)
347 redis_pipeline.incr(key, total_logs)
348 redis_pipeline.expire(key, 3600 * 24)
348 redis_pipeline.expire(key, 3600 * 24)
349 key = REDIS_KEYS["counters"]["events_per_minute_per_user"].format(
349 key = REDIS_KEYS["counters"]["events_per_minute_per_user"].format(
350 resource.owner_user_id, current_time
350 resource.owner_user_id, current_time
351 )
351 )
352 redis_pipeline.incr(key, total_logs)
352 redis_pipeline.incr(key, total_logs)
353 redis_pipeline.expire(key, 3600)
353 redis_pipeline.expire(key, 3600)
354 key = REDIS_KEYS["counters"]["logs_per_hour_per_app"].format(
354 key = REDIS_KEYS["counters"]["logs_per_hour_per_app"].format(
355 resource_id, current_time.replace(minute=0)
355 resource_id, current_time.replace(minute=0)
356 )
356 )
357 redis_pipeline.incr(key, total_logs)
357 redis_pipeline.incr(key, total_logs)
358 redis_pipeline.expire(key, 3600 * 24 * 7)
358 redis_pipeline.expire(key, 3600 * 24 * 7)
359 redis_pipeline.sadd(
359 redis_pipeline.sadd(
360 REDIS_KEYS["apps_that_got_new_data_per_hour"].format(
360 REDIS_KEYS["apps_that_got_new_data_per_hour"].format(
361 current_time.replace(minute=0)
361 current_time.replace(minute=0)
362 ),
362 ),
363 resource_id,
363 resource_id,
364 )
364 )
365 redis_pipeline.execute()
365 redis_pipeline.execute()
366 add_logs_es(es_docs)
366 add_logs_es(es_docs)
367 return True
367 return True
368 except Exception as exc:
368 except Exception as exc:
369 print_traceback(log)
369 print_traceback(log)
370 if celery.conf["CELERY_EAGER_PROPAGATES_EXCEPTIONS"]:
370 if celery.conf["CELERY_EAGER_PROPAGATES_EXCEPTIONS"]:
371 raise
371 raise
372 add_logs.retry(exc=exc)
372 add_logs.retry(exc=exc)
373
373
374
374
375 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
375 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
376 def add_logs_es(es_docs):
376 def add_logs_es(es_docs):
377 for k, v in es_docs.items():
377 for k, v in es_docs.items():
378 to_update = {"_index": k, "_type": "log"}
378 to_update = {"_index": k, "_type": "log"}
379 [i.update(to_update) for i in v]
379 [i.update(to_update) for i in v]
380 elasticsearch.helpers.bulk(Datastores.es, v)
380 elasticsearch.helpers.bulk(Datastores.es, v)
381
381
382
382
383 @celery.task(queue="metrics", default_retry_delay=600, max_retries=144)
383 @celery.task(queue="metrics", default_retry_delay=600, max_retries=144)
384 def add_metrics(resource_id, request_params, dataset, proto_version):
384 def add_metrics(resource_id, request_params, dataset, proto_version):
385 current_time = datetime.utcnow().replace(second=0, microsecond=0)
385 current_time = datetime.utcnow().replace(second=0, microsecond=0)
386 try:
386 try:
387 resource = ApplicationService.by_id_cached()(resource_id)
387 resource = ApplicationService.by_id_cached()(resource_id)
388 resource = DBSession.merge(resource, load=False)
388 resource = DBSession.merge(resource, load=False)
389 es_docs = []
389 es_docs = []
390 rows = []
390 rows = []
391 for metric in dataset:
391 for metric in dataset:
392 tags = dict(metric["tags"])
392 tags = dict(metric["tags"])
393 server_n = tags.get("server_name", metric["server_name"]).lower()
393 server_n = tags.get("server_name", metric["server_name"]).lower()
394 tags["server_name"] = server_n or "unknown"
394 tags["server_name"] = server_n or "unknown"
395 new_metric = Metric(
395 new_metric = Metric(
396 timestamp=metric["timestamp"],
396 timestamp=metric["timestamp"],
397 resource_id=resource.resource_id,
397 resource_id=resource.resource_id,
398 namespace=metric["namespace"],
398 namespace=metric["namespace"],
399 tags=tags,
399 tags=tags,
400 )
400 )
401 rows.append(new_metric)
401 rows.append(new_metric)
402 es_docs.append(new_metric.es_doc())
402 es_docs.append(new_metric.es_doc())
403 session = DBSession()
403 session = DBSession()
404 session.bulk_save_objects(rows)
404 session.bulk_save_objects(rows)
405 session.flush()
405 session.flush()
406
406
407 action = "METRICS"
407 action = "METRICS"
408 metrics_msg = "%s: %s, metrics: %s, proto:%s" % (
408 metrics_msg = "%s: %s, metrics: %s, proto:%s" % (
409 action,
409 action,
410 str(resource),
410 str(resource),
411 len(dataset),
411 len(dataset),
412 proto_version,
412 proto_version,
413 )
413 )
414 log.info(metrics_msg)
414 log.info(metrics_msg)
415
415
416 mark_changed(session)
416 mark_changed(session)
417 redis_pipeline = Datastores.redis.pipeline(transaction=False)
417 redis_pipeline = Datastores.redis.pipeline(transaction=False)
418 key = REDIS_KEYS["counters"]["metrics_per_minute"].format(current_time)
418 key = REDIS_KEYS["counters"]["metrics_per_minute"].format(current_time)
419 redis_pipeline.incr(key, len(rows))
419 redis_pipeline.incr(key, len(rows))
420 redis_pipeline.expire(key, 3600 * 24)
420 redis_pipeline.expire(key, 3600 * 24)
421 key = REDIS_KEYS["counters"]["events_per_minute_per_user"].format(
421 key = REDIS_KEYS["counters"]["events_per_minute_per_user"].format(
422 resource.owner_user_id, current_time
422 resource.owner_user_id, current_time
423 )
423 )
424 redis_pipeline.incr(key, len(rows))
424 redis_pipeline.incr(key, len(rows))
425 redis_pipeline.expire(key, 3600)
425 redis_pipeline.expire(key, 3600)
426 key = REDIS_KEYS["counters"]["metrics_per_hour_per_app"].format(
426 key = REDIS_KEYS["counters"]["metrics_per_hour_per_app"].format(
427 resource_id, current_time.replace(minute=0)
427 resource_id, current_time.replace(minute=0)
428 )
428 )
429 redis_pipeline.incr(key, len(rows))
429 redis_pipeline.incr(key, len(rows))
430 redis_pipeline.expire(key, 3600 * 24 * 7)
430 redis_pipeline.expire(key, 3600 * 24 * 7)
431 redis_pipeline.sadd(
431 redis_pipeline.sadd(
432 REDIS_KEYS["apps_that_got_new_data_per_hour"].format(
432 REDIS_KEYS["apps_that_got_new_data_per_hour"].format(
433 current_time.replace(minute=0)
433 current_time.replace(minute=0)
434 ),
434 ),
435 resource_id,
435 resource_id,
436 )
436 )
437 redis_pipeline.execute()
437 redis_pipeline.execute()
438 add_metrics_es(es_docs)
438 add_metrics_es(es_docs)
439 return True
439 return True
440 except Exception as exc:
440 except Exception as exc:
441 print_traceback(log)
441 print_traceback(log)
442 if celery.conf["CELERY_EAGER_PROPAGATES_EXCEPTIONS"]:
442 if celery.conf["CELERY_EAGER_PROPAGATES_EXCEPTIONS"]:
443 raise
443 raise
444 add_metrics.retry(exc=exc)
444 add_metrics.retry(exc=exc)
445
445
446
446
447 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
447 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
448 def add_metrics_es(es_docs):
448 def add_metrics_es(es_docs):
449 for doc in es_docs:
449 for doc in es_docs:
450 partition = "rcae_m_%s" % doc["timestamp"].strftime("%Y_%m_%d")
450 partition = "rcae_m_%s" % doc["timestamp"].strftime("%Y_%m_%d")
451 Datastores.es.index(partition, "log", doc)
451 Datastores.es.index(partition, "log", doc)
452
452
453
453
454 @celery.task(queue="default", default_retry_delay=5, max_retries=2)
454 @celery.task(queue="default", default_retry_delay=5, max_retries=2)
455 def check_user_report_notifications(resource_id):
455 def check_user_report_notifications(resource_id):
456 since_when = datetime.utcnow()
456 since_when = datetime.utcnow()
457 try:
457 try:
458 request = get_current_request()
458 request = get_current_request()
459 application = ApplicationService.by_id(resource_id)
459 application = ApplicationService.by_id(resource_id)
460 if not application:
460 if not application:
461 return
461 return
462 error_key = REDIS_KEYS["reports_to_notify_per_type_per_app"].format(
462 error_key = REDIS_KEYS["reports_to_notify_per_type_per_app"].format(
463 ReportType.error, resource_id
463 ReportType.error, resource_id
464 )
464 )
465 slow_key = REDIS_KEYS["reports_to_notify_per_type_per_app"].format(
465 slow_key = REDIS_KEYS["reports_to_notify_per_type_per_app"].format(
466 ReportType.slow, resource_id
466 ReportType.slow, resource_id
467 )
467 )
468 error_group_ids = Datastores.redis.smembers(error_key)
468 error_group_ids = Datastores.redis.smembers(error_key)
469 slow_group_ids = Datastores.redis.smembers(slow_key)
469 slow_group_ids = Datastores.redis.smembers(slow_key)
470 Datastores.redis.delete(error_key)
470 Datastores.redis.delete(error_key)
471 Datastores.redis.delete(slow_key)
471 Datastores.redis.delete(slow_key)
472 err_gids = [int(g_id) for g_id in error_group_ids]
472 err_gids = [int(g_id) for g_id in error_group_ids]
473 slow_gids = [int(g_id) for g_id in list(slow_group_ids)]
473 slow_gids = [int(g_id) for g_id in list(slow_group_ids)]
474 group_ids = err_gids + slow_gids
474 group_ids = err_gids + slow_gids
475 occurence_dict = {}
475 occurence_dict = {}
476 for g_id in group_ids:
476 for g_id in group_ids:
477 key = REDIS_KEYS["counters"]["report_group_occurences"].format(g_id)
477 key = REDIS_KEYS["counters"]["report_group_occurences"].format(g_id)
478 val = Datastores.redis.get(key)
478 val = Datastores.redis.get(key)
479 Datastores.redis.delete(key)
479 Datastores.redis.delete(key)
480 if val:
480 if val:
481 occurence_dict[g_id] = int(val)
481 occurence_dict[g_id] = int(val)
482 else:
482 else:
483 occurence_dict[g_id] = 1
483 occurence_dict[g_id] = 1
484 report_groups = ReportGroupService.by_ids(group_ids)
484 report_groups = ReportGroupService.by_ids(group_ids)
485 report_groups.options(sa.orm.joinedload(ReportGroup.last_report_ref))
485 report_groups.options(sa.orm.joinedload(ReportGroup.last_report_ref))
486
486
487 ApplicationService.check_for_groups_alert(
487 ApplicationService.check_for_groups_alert(
488 application,
488 application,
489 "alert",
489 "alert",
490 report_groups=report_groups,
490 report_groups=report_groups,
491 occurence_dict=occurence_dict,
491 occurence_dict=occurence_dict,
492 )
492 )
493 users = set(
493 users = set(
494 [p.user for p in ResourceService.users_for_perm(application, "view")]
494 [p.user for p in ResourceService.users_for_perm(application, "view")]
495 )
495 )
496 report_groups = report_groups.all()
496 report_groups = report_groups.all()
497 for user in users:
497 for user in users:
498 UserService.report_notify(
498 UserService.report_notify(
499 user,
499 user,
500 request,
500 request,
501 application,
501 application,
502 report_groups=report_groups,
502 report_groups=report_groups,
503 occurence_dict=occurence_dict,
503 occurence_dict=occurence_dict,
504 )
504 )
505 for group in report_groups:
505 for group in report_groups:
506 # marks report_groups as notified
506 # marks report_groups as notified
507 if not group.notified:
507 if not group.notified:
508 group.notified = True
508 group.notified = True
509 except Exception as exc:
509 except Exception as exc:
510 print_traceback(log)
510 print_traceback(log)
511 raise
511 raise
512
512
513
513
514 @celery.task(queue="default", default_retry_delay=5, max_retries=2)
514 @celery.task(queue="default", default_retry_delay=5, max_retries=2)
515 def check_alerts(resource_id):
515 def check_alerts(resource_id):
516 since_when = datetime.utcnow()
516 since_when = datetime.utcnow()
517 try:
517 try:
518 request = get_current_request()
518 request = get_current_request()
519 application = ApplicationService.by_id(resource_id)
519 application = ApplicationService.by_id(resource_id)
520 if not application:
520 if not application:
521 return
521 return
522 error_key = REDIS_KEYS["reports_to_notify_per_type_per_app_alerting"].format(
522 error_key = REDIS_KEYS["reports_to_notify_per_type_per_app_alerting"].format(
523 ReportType.error, resource_id
523 ReportType.error, resource_id
524 )
524 )
525 slow_key = REDIS_KEYS["reports_to_notify_per_type_per_app_alerting"].format(
525 slow_key = REDIS_KEYS["reports_to_notify_per_type_per_app_alerting"].format(
526 ReportType.slow, resource_id
526 ReportType.slow, resource_id
527 )
527 )
528 error_group_ids = Datastores.redis.smembers(error_key)
528 error_group_ids = Datastores.redis.smembers(error_key)
529 slow_group_ids = Datastores.redis.smembers(slow_key)
529 slow_group_ids = Datastores.redis.smembers(slow_key)
530 Datastores.redis.delete(error_key)
530 Datastores.redis.delete(error_key)
531 Datastores.redis.delete(slow_key)
531 Datastores.redis.delete(slow_key)
532 err_gids = [int(g_id) for g_id in error_group_ids]
532 err_gids = [int(g_id) for g_id in error_group_ids]
533 slow_gids = [int(g_id) for g_id in list(slow_group_ids)]
533 slow_gids = [int(g_id) for g_id in list(slow_group_ids)]
534 group_ids = err_gids + slow_gids
534 group_ids = err_gids + slow_gids
535 occurence_dict = {}
535 occurence_dict = {}
536 for g_id in group_ids:
536 for g_id in group_ids:
537 key = REDIS_KEYS["counters"]["report_group_occurences_alerting"].format(
537 key = REDIS_KEYS["counters"]["report_group_occurences_alerting"].format(
538 g_id
538 g_id
539 )
539 )
540 val = Datastores.redis.get(key)
540 val = Datastores.redis.get(key)
541 Datastores.redis.delete(key)
541 Datastores.redis.delete(key)
542 if val:
542 if val:
543 occurence_dict[g_id] = int(val)
543 occurence_dict[g_id] = int(val)
544 else:
544 else:
545 occurence_dict[g_id] = 1
545 occurence_dict[g_id] = 1
546 report_groups = ReportGroupService.by_ids(group_ids)
546 report_groups = ReportGroupService.by_ids(group_ids)
547 report_groups.options(sa.orm.joinedload(ReportGroup.last_report_ref))
547 report_groups.options(sa.orm.joinedload(ReportGroup.last_report_ref))
548
548
549 ApplicationService.check_for_groups_alert(
549 ApplicationService.check_for_groups_alert(
550 application,
550 application,
551 "alert",
551 "alert",
552 report_groups=report_groups,
552 report_groups=report_groups,
553 occurence_dict=occurence_dict,
553 occurence_dict=occurence_dict,
554 since_when=since_when,
554 since_when=since_when,
555 )
555 )
556 except Exception as exc:
556 except Exception as exc:
557 print_traceback(log)
557 print_traceback(log)
558 raise
558 raise
559
559
560
560
561 @celery.task(queue="default", default_retry_delay=1, max_retries=2)
561 @celery.task(queue="default", default_retry_delay=1, max_retries=2)
562 def close_alerts():
562 def close_alerts():
563 log.warning("Checking alerts")
563 log.warning("Checking alerts")
564 since_when = datetime.utcnow()
564 since_when = datetime.utcnow()
565 try:
565 try:
566 event_types = [
566 event_types = [
567 Event.types["error_report_alert"],
567 Event.types["error_report_alert"],
568 Event.types["slow_report_alert"],
568 Event.types["slow_report_alert"],
569 ]
569 ]
570 statuses = [Event.statuses["active"]]
570 statuses = [Event.statuses["active"]]
571 # get events older than 5 min
571 # get events older than 5 min
572 events = EventService.by_type_and_status(
572 events = EventService.by_type_and_status(
573 event_types, statuses, older_than=(since_when - timedelta(minutes=5))
573 event_types, statuses, older_than=(since_when - timedelta(minutes=5))
574 )
574 )
575 for event in events:
575 for event in events:
576 # see if we can close them
576 # see if we can close them
577 event.validate_or_close(since_when=(since_when - timedelta(minutes=1)))
577 event.validate_or_close(since_when=(since_when - timedelta(minutes=1)))
578 except Exception as exc:
578 except Exception as exc:
579 print_traceback(log)
579 print_traceback(log)
580 raise
580 raise
581
581
582
582
583 @celery.task(queue="default", default_retry_delay=600, max_retries=144)
583 @celery.task(queue="default", default_retry_delay=600, max_retries=144)
584 def update_tag_counter(tag_name, tag_value, count):
584 def update_tag_counter(tag_name, tag_value, count):
585 try:
585 try:
586 query = (
586 query = (
587 DBSession.query(Tag)
587 DBSession.query(Tag)
588 .filter(Tag.name == tag_name)
588 .filter(Tag.name == tag_name)
589 .filter(
589 .filter(
590 sa.cast(Tag.value, sa.types.TEXT)
590 sa.cast(Tag.value, sa.types.TEXT)
591 == sa.cast(json.dumps(tag_value), sa.types.TEXT)
591 == sa.cast(json.dumps(tag_value), sa.types.TEXT)
592 )
592 )
593 )
593 )
594 query.update(
594 query.update(
595 {"times_seen": Tag.times_seen + count, "last_timestamp": datetime.utcnow()},
595 {"times_seen": Tag.times_seen + count, "last_timestamp": datetime.utcnow()},
596 synchronize_session=False,
596 synchronize_session=False,
597 )
597 )
598 session = DBSession()
598 session = DBSession()
599 mark_changed(session)
599 mark_changed(session)
600 return True
600 return True
601 except Exception as exc:
601 except Exception as exc:
602 print_traceback(log)
602 print_traceback(log)
603 if celery.conf["CELERY_EAGER_PROPAGATES_EXCEPTIONS"]:
603 if celery.conf["CELERY_EAGER_PROPAGATES_EXCEPTIONS"]:
604 raise
604 raise
605 update_tag_counter.retry(exc=exc)
605 update_tag_counter.retry(exc=exc)
606
606
607
607
608 @celery.task(queue="default")
608 @celery.task(queue="default")
609 def update_tag_counters():
609 def update_tag_counters():
610 """
610 """
611 Sets task to update counters for application tags
611 Sets task to update counters for application tags
612 """
612 """
613 tags = Datastores.redis.lrange(REDIS_KEYS["seen_tag_list"], 0, -1)
613 tags = Datastores.redis.lrange(REDIS_KEYS["seen_tag_list"], 0, -1)
614 Datastores.redis.delete(REDIS_KEYS["seen_tag_list"])
614 Datastores.redis.delete(REDIS_KEYS["seen_tag_list"])
615 c = collections.Counter(tags)
615 c = collections.Counter(tags)
616 for t_json, count in c.items():
616 for t_json, count in c.items():
617 tag_info = json.loads(t_json)
617 tag_info = json.loads(t_json)
618 update_tag_counter.delay(tag_info[0], tag_info[1], count)
618 update_tag_counter.delay(tag_info[0], tag_info[1], count)
619
619
620
620
621 @celery.task(queue="default")
621 @celery.task(queue="default")
622 def daily_digest():
622 def daily_digest():
623 """
623 """
624 Sends daily digest with top 50 error reports
624 Sends daily digest with top 50 error reports
625 """
625 """
626 request = get_current_request()
626 request = get_current_request()
627 apps = Datastores.redis.smembers(REDIS_KEYS["apps_that_had_reports"])
627 apps = Datastores.redis.smembers(REDIS_KEYS["apps_that_had_reports"])
628 Datastores.redis.delete(REDIS_KEYS["apps_that_had_reports"])
628 Datastores.redis.delete(REDIS_KEYS["apps_that_had_reports"])
629 since_when = datetime.utcnow() - timedelta(hours=8)
629 since_when = datetime.utcnow() - timedelta(hours=8)
630 log.warning("Generating daily digests")
630 log.warning("Generating daily digests")
631 for resource_id in apps:
631 for resource_id in apps:
632 resource_id = resource_id.decode("utf8")
632 resource_id = resource_id.decode("utf8")
633 end_date = datetime.utcnow().replace(microsecond=0, second=0)
633 end_date = datetime.utcnow().replace(microsecond=0, second=0)
634 filter_settings = {
634 filter_settings = {
635 "resource": [resource_id],
635 "resource": [resource_id],
636 "tags": [{"name": "type", "value": ["error"], "op": None}],
636 "tags": [{"name": "type", "value": ["error"], "op": None}],
637 "type": "error",
637 "type": "error",
638 "start_date": since_when,
638 "start_date": since_when,
639 "end_date": end_date,
639 "end_date": end_date,
640 }
640 }
641
641
642 reports = ReportGroupService.get_trending(
642 reports = ReportGroupService.get_trending(
643 request, filter_settings=filter_settings, limit=50
643 request, filter_settings=filter_settings, limit=50
644 )
644 )
645
645
646 application = ApplicationService.by_id(resource_id)
646 application = ApplicationService.by_id(resource_id)
647 if application:
647 if application:
648 users = set(
648 users = set(
649 [p.user for p in ResourceService.users_for_perm(application, "view")]
649 [p.user for p in ResourceService.users_for_perm(application, "view")]
650 )
650 )
651 for user in users:
651 for user in users:
652 user.send_digest(
652 user.send_digest(
653 request, application, reports=reports, since_when=since_when
653 request, application, reports=reports, since_when=since_when
654 )
654 )
655
655
656
656
657 @celery.task(queue="default")
657 @celery.task(queue="default")
658 def notifications_reports():
658 def notifications_reports():
659 """
659 """
660 Loop that checks redis for info and then issues new tasks to celery to
660 Loop that checks redis for info and then issues new tasks to celery to
661 issue notifications
661 issue notifications
662 """
662 """
663 apps = Datastores.redis.smembers(REDIS_KEYS["apps_that_had_reports"])
663 apps = Datastores.redis.smembers(REDIS_KEYS["apps_that_had_reports"])
664 Datastores.redis.delete(REDIS_KEYS["apps_that_had_reports"])
664 Datastores.redis.delete(REDIS_KEYS["apps_that_had_reports"])
665 for app in apps:
665 for app in apps:
666 log.warning("Notify for app: %s" % app)
666 log.warning("Notify for app: %s" % app)
667 check_user_report_notifications.delay(app.decode("utf8"))
667 check_user_report_notifications.delay(app.decode("utf8"))
668
668
669
669
670 @celery.task(queue="default")
670 @celery.task(queue="default")
671 def alerting_reports():
671 def alerting_reports():
672 """
672 """
673 Loop that checks redis for info and then issues new tasks to celery to
673 Loop that checks redis for info and then issues new tasks to celery to
674 perform the following:
674 perform the following:
675 - which applications should have new alerts opened
675 - which applications should have new alerts opened
676 """
676 """
677
677
678 apps = Datastores.redis.smembers(REDIS_KEYS["apps_that_had_reports_alerting"])
678 apps = Datastores.redis.smembers(REDIS_KEYS["apps_that_had_reports_alerting"])
679 Datastores.redis.delete(REDIS_KEYS["apps_that_had_reports_alerting"])
679 Datastores.redis.delete(REDIS_KEYS["apps_that_had_reports_alerting"])
680 for app in apps:
680 for app in apps:
681 log.warning("Notify for app: %s" % app)
681 log.warning("Notify for app: %s" % app)
682 check_alerts.delay(app.decode("utf8"))
682 check_alerts.delay(app.decode("utf8"))
683
683
684
684
685 @celery.task(
685 @celery.task(
686 queue="default", soft_time_limit=3600 * 4, hard_time_limit=3600 * 4, max_retries=144
686 queue="default", soft_time_limit=3600 * 4, hard_time_limit=3600 * 4, max_retries=144
687 )
687 )
688 def logs_cleanup(resource_id, filter_settings):
688 def logs_cleanup(resource_id, filter_settings):
689 request = get_current_request()
689 request = get_current_request()
690 request.tm.begin()
690 request.tm.begin()
691 es_query = {
691 es_query = {
692 "query": {
692 "query": {
693 "bool": {"filter": [{"term": {"resource_id": resource_id}}]}
693 "bool": {"filter": [{"term": {"resource_id": resource_id}}]}
694 }
694 }
695 }
695 }
696
696
697 query = DBSession.query(Log).filter(Log.resource_id == resource_id)
697 query = DBSession.query(Log).filter(Log.resource_id == resource_id)
698 if filter_settings["namespace"]:
698 if filter_settings["namespace"]:
699 query = query.filter(Log.namespace == filter_settings["namespace"][0])
699 query = query.filter(Log.namespace == filter_settings["namespace"][0])
700 es_query["query"]["bool"]["filter"].append(
700 es_query["query"]["bool"]["filter"].append(
701 {"term": {"namespace": filter_settings["namespace"][0]}}
701 {"term": {"namespace": filter_settings["namespace"][0]}}
702 )
702 )
703 query.delete(synchronize_session=False)
703 query.delete(synchronize_session=False)
704 request.tm.commit()
704 request.tm.commit()
705 Datastores.es.delete_by_query(
705 Datastores.es.delete_by_query(
706 index="rcae_l_*", doc_type="log", body=es_query, conflicts="proceed"
706 index="rcae_l_*", doc_type="log", body=es_query, conflicts="proceed"
707 )
707 )
@@ -1,132 +1,132 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors
3 # Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors
4 #
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
7 # You may obtain a copy of the License at
8 #
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
10 #
11 # Unless required by applicable law or agreed to in writing, software
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
15 # limitations under the License.
16
16
17 import sqlalchemy as sa
17 import sqlalchemy as sa
18 import logging
18 import logging
19 import hashlib
19 import hashlib
20
20
21 from datetime import datetime
21 from datetime import datetime
22 from appenlight.models import Base
22 from appenlight.models import Base
23 from appenlight.lib.utils import convert_es_type
23 from appenlight.lib.utils import convert_es_type
24 from appenlight.lib.enums import LogLevel
24 from appenlight.lib.enums import LogLevel
25 from sqlalchemy.dialects.postgresql import JSON
25 from sqlalchemy.dialects.postgresql import JSON
26 from ziggurat_foundations.models.base import BaseModel
26 from ziggurat_foundations.models.base import BaseModel
27
27
28 log = logging.getLogger(__name__)
28 log = logging.getLogger(__name__)
29
29
30
30
31 class Log(Base, BaseModel):
31 class Log(Base, BaseModel):
32 __tablename__ = "logs"
32 __tablename__ = "logs"
33 __table_args__ = {"implicit_returning": False}
33 __table_args__ = {"implicit_returning": False}
34
34
35 log_id = sa.Column(sa.BigInteger(), nullable=False, primary_key=True)
35 log_id = sa.Column(sa.BigInteger(), nullable=False, primary_key=True)
36 resource_id = sa.Column(
36 resource_id = sa.Column(
37 sa.Integer(),
37 sa.Integer(),
38 sa.ForeignKey(
38 sa.ForeignKey(
39 "applications.resource_id", onupdate="CASCADE", ondelete="CASCADE"
39 "applications.resource_id", onupdate="CASCADE", ondelete="CASCADE"
40 ),
40 ),
41 nullable=False,
41 nullable=False,
42 index=True,
42 index=True,
43 )
43 )
44 log_level = sa.Column(sa.Unicode, nullable=False, index=True, default="INFO")
44 log_level = sa.Column(sa.Unicode, nullable=False, index=True, default="INFO")
45 message = sa.Column(sa.UnicodeText(), default="")
45 message = sa.Column(sa.UnicodeText(), default="")
46 timestamp = sa.Column(
46 timestamp = sa.Column(
47 sa.DateTime(), default=datetime.utcnow, server_default=sa.func.now()
47 sa.DateTime(), default=datetime.utcnow, server_default=sa.func.now()
48 )
48 )
49 request_id = sa.Column(sa.Unicode())
49 request_id = sa.Column(sa.Unicode())
50 namespace = sa.Column(sa.Unicode())
50 namespace = sa.Column(sa.Unicode())
51 primary_key = sa.Column(sa.Unicode())
51 primary_key = sa.Column(sa.Unicode())
52
52
53 tags = sa.Column(JSON(), default={})
53 tags = sa.Column(JSON(), default={})
54 permanent = sa.Column(sa.Boolean(), nullable=False, default=False)
54 permanent = sa.Column(sa.Boolean(), nullable=False, default=False)
55
55
56 def __str__(self):
56 def __str__(self):
57 return self.__unicode__().encode("utf8")
57 return self.__unicode__().encode("utf8")
58
58
59 def __unicode__(self):
59 def __unicode__(self):
60 return "<Log id:%s, lv:%s, ns:%s >" % (
60 return "<Log id:%s, lv:%s, ns:%s >" % (
61 self.log_id,
61 self.log_id,
62 self.log_level,
62 self.log_level,
63 self.namespace,
63 self.namespace,
64 )
64 )
65
65
66 def set_data(self, data, resource):
66 def set_data(self, data, resource):
67 level = data.get("log_level").upper()
67 level = data.get("log_level").upper()
68 self.log_level = getattr(LogLevel, level, LogLevel.UNKNOWN)
68 self.log_level = getattr(LogLevel, level, LogLevel.UNKNOWN)
69 self.message = data.get("message", "")
69 self.message = data.get("message", "")
70 server_name = data.get("server", "").lower() or "unknown"
70 server_name = data.get("server", "").lower() or "unknown"
71 self.tags = {"server_name": server_name}
71 self.tags = {"server_name": server_name}
72 if data.get("tags"):
72 if data.get("tags"):
73 for tag_tuple in data["tags"]:
73 for tag_tuple in data["tags"]:
74 self.tags[tag_tuple[0]] = tag_tuple[1]
74 self.tags[tag_tuple[0]] = tag_tuple[1]
75 self.timestamp = data["date"]
75 self.timestamp = data["date"]
76 r_id = data.get("request_id", "")
76 r_id = data.get("request_id", "")
77 if not r_id:
77 if not r_id:
78 r_id = ""
78 r_id = ""
79 self.request_id = r_id.replace("-", "")
79 self.request_id = r_id.replace("-", "")
80 self.resource_id = resource.resource_id
80 self.resource_id = resource.resource_id
81 self.namespace = data.get("namespace") or ""
81 self.namespace = data.get("namespace") or ""
82 self.permanent = data.get("permanent")
82 self.permanent = data.get("permanent")
83 self.primary_key = data.get("primary_key")
83 self.primary_key = data.get("primary_key")
84 if self.primary_key is not None:
84 if self.primary_key is not None:
85 self.tags["appenlight_primary_key"] = self.primary_key
85 self.tags["appenlight_primary_key"] = self.primary_key
86
86
87 def get_dict(self):
87 def get_dict(self):
88 instance_dict = super(Log, self).get_dict()
88 instance_dict = super(Log, self).get_dict()
89 instance_dict["log_level"] = LogLevel.key_from_value(self.log_level)
89 instance_dict["log_level"] = LogLevel.key_from_value(self.log_level)
90 instance_dict["resource_name"] = self.application.resource_name
90 instance_dict["resource_name"] = self.application.resource_name
91 return instance_dict
91 return instance_dict
92
92
93 @property
93 @property
94 def delete_hash(self):
94 def delete_hash(self):
95 if not self.primary_key:
95 if not self.primary_key:
96 return None
96 return None
97
97
98 to_hash = "{}_{}_{}".format(self.resource_id, self.primary_key, self.namespace)
98 to_hash = "{}_{}_{}".format(self.resource_id, self.primary_key, self.namespace)
99 return hashlib.sha1(to_hash.encode("utf8")).hexdigest()
99 return hashlib.sha1(to_hash.encode("utf8")).hexdigest()
100
100
101 def es_doc(self):
101 def es_doc(self):
102 tags = {}
102 tags = {}
103 tag_list = []
103 tag_list = []
104 for name, value in self.tags.items():
104 for name, value in self.tags.items():
105 # replace dot in indexed tag name
105 # replace dot in indexed tag name
106 name = name.replace(".", "_")
106 name = name.replace(".", "_")
107 tag_list.append(name)
107 tag_list.append(name)
108 tags[name] = {
108 tags[name] = {
109 "values": convert_es_type(value),
109 "values": convert_es_type(value),
110 "numeric_values": value
110 "numeric_values": value
111 if (isinstance(value, (int, float)) and not isinstance(value, bool))
111 if (isinstance(value, (int, float)) and not isinstance(value, bool))
112 else None,
112 else None,
113 }
113 }
114 return {
114 return {
115 "pg_id": str(self.log_id),
115 "log_id": str(self.log_id),
116 "delete_hash": self.delete_hash,
116 "delete_hash": self.delete_hash,
117 "resource_id": self.resource_id,
117 "resource_id": self.resource_id,
118 "request_id": self.request_id,
118 "request_id": self.request_id,
119 "log_level": LogLevel.key_from_value(self.log_level),
119 "log_level": LogLevel.key_from_value(self.log_level),
120 "timestamp": self.timestamp,
120 "timestamp": self.timestamp,
121 "message": self.message if self.message else "",
121 "message": self.message if self.message else "",
122 "namespace": self.namespace if self.namespace else "",
122 "namespace": self.namespace if self.namespace else "",
123 "tags": tags,
123 "tags": tags,
124 "tag_list": tag_list,
124 "tag_list": tag_list,
125 }
125 }
126
126
127 @property
127 @property
128 def partition_id(self):
128 def partition_id(self):
129 if self.permanent:
129 if self.permanent:
130 return "rcae_l_%s" % self.timestamp.strftime("%Y_%m")
130 return "rcae_l_%s" % self.timestamp.strftime("%Y_%m")
131 else:
131 else:
132 return "rcae_l_%s" % self.timestamp.strftime("%Y_%m_%d")
132 return "rcae_l_%s" % self.timestamp.strftime("%Y_%m_%d")
@@ -1,68 +1,69 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors
3 # Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors
4 #
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
7 # You may obtain a copy of the License at
8 #
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
10 #
11 # Unless required by applicable law or agreed to in writing, software
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
15 # limitations under the License.
16
16
17 from datetime import datetime
17 from datetime import datetime
18
18
19 import sqlalchemy as sa
19 import sqlalchemy as sa
20 from sqlalchemy.dialects.postgresql import JSON
20 from sqlalchemy.dialects.postgresql import JSON
21
21
22 from ziggurat_foundations.models.base import BaseModel
22 from ziggurat_foundations.models.base import BaseModel
23 from appenlight.lib.utils import convert_es_type
23 from appenlight.lib.utils import convert_es_type
24 from appenlight.models import Base
24 from appenlight.models import Base
25
25
26
26
27 class Metric(Base, BaseModel):
27 class Metric(Base, BaseModel):
28 __tablename__ = "metrics"
28 __tablename__ = "metrics"
29 __table_args__ = {"implicit_returning": False}
29 __table_args__ = {"implicit_returning": False}
30
30
31 pkey = sa.Column(sa.BigInteger(), primary_key=True)
31 pkey = sa.Column(sa.BigInteger(), primary_key=True)
32 resource_id = sa.Column(
32 resource_id = sa.Column(
33 sa.Integer(),
33 sa.Integer(),
34 sa.ForeignKey("applications.resource_id"),
34 sa.ForeignKey("applications.resource_id"),
35 nullable=False,
35 nullable=False,
36 primary_key=True,
36 primary_key=True,
37 )
37 )
38 timestamp = sa.Column(
38 timestamp = sa.Column(
39 sa.DateTime(), default=datetime.utcnow, server_default=sa.func.now()
39 sa.DateTime(), default=datetime.utcnow, server_default=sa.func.now()
40 )
40 )
41 tags = sa.Column(JSON(), default={})
41 tags = sa.Column(JSON(), default={})
42 namespace = sa.Column(sa.Unicode(255))
42 namespace = sa.Column(sa.Unicode(255))
43
43
44 @property
44 @property
45 def partition_id(self):
45 def partition_id(self):
46 return "rcae_m_%s" % self.timestamp.strftime("%Y_%m_%d")
46 return "rcae_m_%s" % self.timestamp.strftime("%Y_%m_%d")
47
47
48 def es_doc(self):
48 def es_doc(self):
49 tags = {}
49 tags = {}
50 tag_list = []
50 tag_list = []
51 for name, value in self.tags.items():
51 for name, value in self.tags.items():
52 # replace dot in indexed tag name
52 # replace dot in indexed tag name
53 name = name.replace(".", "_")
53 name = name.replace(".", "_")
54 tag_list.append(name)
54 tag_list.append(name)
55 tags[name] = {
55 tags[name] = {
56 "values": convert_es_type(value),
56 "values": convert_es_type(value),
57 "numeric_values": value
57 "numeric_values": value
58 if (isinstance(value, (int, float)) and not isinstance(value, bool))
58 if (isinstance(value, (int, float)) and not isinstance(value, bool))
59 else None,
59 else None,
60 }
60 }
61
61
62 return {
62 return {
63 "metric_id": self.pkey,
63 "resource_id": self.resource_id,
64 "resource_id": self.resource_id,
64 "timestamp": self.timestamp,
65 "timestamp": self.timestamp,
65 "namespace": self.namespace,
66 "namespace": self.namespace,
66 "tags": tags,
67 "tags": tags,
67 "tag_list": tag_list,
68 "tag_list": tag_list,
68 }
69 }
@@ -1,529 +1,534 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors
3 # Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors
4 #
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
7 # You may obtain a copy of the License at
8 #
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
10 #
11 # Unless required by applicable law or agreed to in writing, software
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
15 # limitations under the License.
16
16
17 from datetime import datetime, timedelta
17 from datetime import datetime, timedelta
18 import math
18 import math
19 import uuid
19 import uuid
20 import hashlib
20 import hashlib
21 import copy
21 import copy
22 import urllib.parse
22 import urllib.parse
23 import logging
23 import logging
24 import sqlalchemy as sa
24 import sqlalchemy as sa
25
25
26 from appenlight.models import Base, Datastores
26 from appenlight.models import Base, Datastores
27 from appenlight.lib.utils.date_utils import convert_date
27 from appenlight.lib.utils.date_utils import convert_date
28 from appenlight.lib.utils import convert_es_type
28 from appenlight.lib.utils import convert_es_type
29 from appenlight.models.slow_call import SlowCall
29 from appenlight.models.slow_call import SlowCall
30 from appenlight.lib.utils import channelstream_request
30 from appenlight.lib.utils import channelstream_request
31 from appenlight.lib.enums import ReportType, Language
31 from appenlight.lib.enums import ReportType, Language
32 from pyramid.threadlocal import get_current_registry, get_current_request
32 from pyramid.threadlocal import get_current_registry, get_current_request
33 from sqlalchemy.dialects.postgresql import JSON
33 from sqlalchemy.dialects.postgresql import JSON
34 from ziggurat_foundations.models.base import BaseModel
34 from ziggurat_foundations.models.base import BaseModel
35
35
36 log = logging.getLogger(__name__)
36 log = logging.getLogger(__name__)
37
37
38 REPORT_TYPE_MATRIX = {
38 REPORT_TYPE_MATRIX = {
39 "http_status": {"type": "int", "ops": ("eq", "ne", "ge", "le")},
39 "http_status": {"type": "int", "ops": ("eq", "ne", "ge", "le")},
40 "group:priority": {"type": "int", "ops": ("eq", "ne", "ge", "le")},
40 "group:priority": {"type": "int", "ops": ("eq", "ne", "ge", "le")},
41 "duration": {"type": "float", "ops": ("ge", "le")},
41 "duration": {"type": "float", "ops": ("ge", "le")},
42 "url_domain": {
42 "url_domain": {
43 "type": "unicode",
43 "type": "unicode",
44 "ops": ("eq", "ne", "startswith", "endswith", "contains"),
44 "ops": ("eq", "ne", "startswith", "endswith", "contains"),
45 },
45 },
46 "url_path": {
46 "url_path": {
47 "type": "unicode",
47 "type": "unicode",
48 "ops": ("eq", "ne", "startswith", "endswith", "contains"),
48 "ops": ("eq", "ne", "startswith", "endswith", "contains"),
49 },
49 },
50 "error": {
50 "error": {
51 "type": "unicode",
51 "type": "unicode",
52 "ops": ("eq", "ne", "startswith", "endswith", "contains"),
52 "ops": ("eq", "ne", "startswith", "endswith", "contains"),
53 },
53 },
54 "tags:server_name": {
54 "tags:server_name": {
55 "type": "unicode",
55 "type": "unicode",
56 "ops": ("eq", "ne", "startswith", "endswith", "contains"),
56 "ops": ("eq", "ne", "startswith", "endswith", "contains"),
57 },
57 },
58 "traceback": {"type": "unicode", "ops": ("contains",)},
58 "traceback": {"type": "unicode", "ops": ("contains",)},
59 "group:occurences": {"type": "int", "ops": ("eq", "ne", "ge", "le")},
59 "group:occurences": {"type": "int", "ops": ("eq", "ne", "ge", "le")},
60 }
60 }
61
61
62
62
63 class Report(Base, BaseModel):
63 class Report(Base, BaseModel):
64 __tablename__ = "reports"
64 __tablename__ = "reports"
65 __table_args__ = {"implicit_returning": False}
65 __table_args__ = {"implicit_returning": False}
66
66
67 id = sa.Column(sa.Integer, nullable=False, primary_key=True)
67 id = sa.Column(sa.Integer, nullable=False, primary_key=True)
68 group_id = sa.Column(
68 group_id = sa.Column(
69 sa.BigInteger,
69 sa.BigInteger,
70 sa.ForeignKey("reports_groups.id", ondelete="cascade", onupdate="cascade"),
70 sa.ForeignKey("reports_groups.id", ondelete="cascade", onupdate="cascade"),
71 )
71 )
72 resource_id = sa.Column(sa.Integer(), nullable=False, index=True)
72 resource_id = sa.Column(sa.Integer(), nullable=False, index=True)
73 report_type = sa.Column(sa.Integer(), nullable=False, index=True)
73 report_type = sa.Column(sa.Integer(), nullable=False, index=True)
74 error = sa.Column(sa.UnicodeText(), index=True)
74 error = sa.Column(sa.UnicodeText(), index=True)
75 extra = sa.Column(JSON(), default={})
75 extra = sa.Column(JSON(), default={})
76 request = sa.Column(JSON(), nullable=False, default={})
76 request = sa.Column(JSON(), nullable=False, default={})
77 ip = sa.Column(sa.String(39), index=True, default="")
77 ip = sa.Column(sa.String(39), index=True, default="")
78 username = sa.Column(sa.Unicode(255), default="")
78 username = sa.Column(sa.Unicode(255), default="")
79 user_agent = sa.Column(sa.Unicode(255), default="")
79 user_agent = sa.Column(sa.Unicode(255), default="")
80 url = sa.Column(sa.UnicodeText(), index=True)
80 url = sa.Column(sa.UnicodeText(), index=True)
81 request_id = sa.Column(sa.Text())
81 request_id = sa.Column(sa.Text())
82 request_stats = sa.Column(JSON(), nullable=False, default={})
82 request_stats = sa.Column(JSON(), nullable=False, default={})
83 traceback = sa.Column(JSON(), nullable=False, default=None)
83 traceback = sa.Column(JSON(), nullable=False, default=None)
84 traceback_hash = sa.Column(sa.Text())
84 traceback_hash = sa.Column(sa.Text())
85 start_time = sa.Column(
85 start_time = sa.Column(
86 sa.DateTime(), default=datetime.utcnow, server_default=sa.func.now()
86 sa.DateTime(), default=datetime.utcnow, server_default=sa.func.now()
87 )
87 )
88 end_time = sa.Column(sa.DateTime())
88 end_time = sa.Column(sa.DateTime())
89 duration = sa.Column(sa.Float, default=0)
89 duration = sa.Column(sa.Float, default=0)
90 http_status = sa.Column(sa.Integer, index=True)
90 http_status = sa.Column(sa.Integer, index=True)
91 url_domain = sa.Column(sa.Unicode(100), index=True)
91 url_domain = sa.Column(sa.Unicode(100), index=True)
92 url_path = sa.Column(sa.Unicode(255), index=True)
92 url_path = sa.Column(sa.Unicode(255), index=True)
93 tags = sa.Column(JSON(), nullable=False, default={})
93 tags = sa.Column(JSON(), nullable=False, default={})
94 language = sa.Column(sa.Integer(), default=0)
94 language = sa.Column(sa.Integer(), default=0)
95 # this is used to determine partition for the report
95 # this is used to determine partition for the report
96 report_group_time = sa.Column(
96 report_group_time = sa.Column(
97 sa.DateTime(), default=datetime.utcnow, server_default=sa.func.now()
97 sa.DateTime(), default=datetime.utcnow, server_default=sa.func.now()
98 )
98 )
99
99
100 logs = sa.orm.relationship(
100 logs = sa.orm.relationship(
101 "Log",
101 "Log",
102 lazy="dynamic",
102 lazy="dynamic",
103 passive_deletes=True,
103 passive_deletes=True,
104 passive_updates=True,
104 passive_updates=True,
105 primaryjoin="and_(Report.request_id==Log.request_id, "
105 primaryjoin="and_(Report.request_id==Log.request_id, "
106 "Log.request_id != None, Log.request_id != '')",
106 "Log.request_id != None, Log.request_id != '')",
107 foreign_keys="[Log.request_id]",
107 foreign_keys="[Log.request_id]",
108 )
108 )
109
109
110 slow_calls = sa.orm.relationship(
110 slow_calls = sa.orm.relationship(
111 "SlowCall",
111 "SlowCall",
112 backref="detail",
112 backref="detail",
113 cascade="all, delete-orphan",
113 cascade="all, delete-orphan",
114 passive_deletes=True,
114 passive_deletes=True,
115 passive_updates=True,
115 passive_updates=True,
116 order_by="SlowCall.timestamp",
116 order_by="SlowCall.timestamp",
117 )
117 )
118
118
119 def set_data(self, data, resource, protocol_version=None):
119 def set_data(self, data, resource, protocol_version=None):
120 self.http_status = data["http_status"]
120 self.http_status = data["http_status"]
121 self.priority = data["priority"]
121 self.priority = data["priority"]
122 self.error = data["error"]
122 self.error = data["error"]
123 report_language = data.get("language", "").lower()
123 report_language = data.get("language", "").lower()
124 self.language = getattr(Language, report_language, Language.unknown)
124 self.language = getattr(Language, report_language, Language.unknown)
125 # we need temp holder here to decide later
125 # we need temp holder here to decide later
126 # if we want to to commit the tags if report is marked for creation
126 # if we want to to commit the tags if report is marked for creation
127 self.tags = {"server_name": data["server"], "view_name": data["view_name"]}
127 self.tags = {"server_name": data["server"], "view_name": data["view_name"]}
128 if data.get("tags"):
128 if data.get("tags"):
129 for tag_tuple in data["tags"]:
129 for tag_tuple in data["tags"]:
130 self.tags[tag_tuple[0]] = tag_tuple[1]
130 self.tags[tag_tuple[0]] = tag_tuple[1]
131 self.traceback = data["traceback"]
131 self.traceback = data["traceback"]
132 stripped_traceback = self.stripped_traceback()
132 stripped_traceback = self.stripped_traceback()
133 tb_repr = repr(stripped_traceback).encode("utf8")
133 tb_repr = repr(stripped_traceback).encode("utf8")
134 self.traceback_hash = hashlib.sha1(tb_repr).hexdigest()
134 self.traceback_hash = hashlib.sha1(tb_repr).hexdigest()
135 url_info = urllib.parse.urlsplit(data.get("url", ""), allow_fragments=False)
135 url_info = urllib.parse.urlsplit(data.get("url", ""), allow_fragments=False)
136 self.url_domain = url_info.netloc[:128]
136 self.url_domain = url_info.netloc[:128]
137 self.url_path = url_info.path[:2048]
137 self.url_path = url_info.path[:2048]
138 self.occurences = data["occurences"]
138 self.occurences = data["occurences"]
139 if self.error:
139 if self.error:
140 self.report_type = ReportType.error
140 self.report_type = ReportType.error
141 else:
141 else:
142 self.report_type = ReportType.slow
142 self.report_type = ReportType.slow
143
143
144 # but if its status 404 its 404 type
144 # but if its status 404 its 404 type
145 if self.http_status in [404, "404"] or self.error == "404 Not Found":
145 if self.http_status in [404, "404"] or self.error == "404 Not Found":
146 self.report_type = ReportType.not_found
146 self.report_type = ReportType.not_found
147 self.error = ""
147 self.error = ""
148
148
149 self.generate_grouping_hash(
149 self.generate_grouping_hash(
150 data.get("appenlight.group_string", data.get("group_string")),
150 data.get("appenlight.group_string", data.get("group_string")),
151 resource.default_grouping,
151 resource.default_grouping,
152 protocol_version,
152 protocol_version,
153 )
153 )
154
154
155 # details
155 # details
156 if data["http_status"] in [404, "404"]:
156 if data["http_status"] in [404, "404"]:
157 data = {
157 data = {
158 "username": data["username"],
158 "username": data["username"],
159 "ip": data["ip"],
159 "ip": data["ip"],
160 "url": data["url"],
160 "url": data["url"],
161 "user_agent": data["user_agent"],
161 "user_agent": data["user_agent"],
162 }
162 }
163 if data.get("HTTP_REFERER") or data.get("http_referer"):
163 if data.get("HTTP_REFERER") or data.get("http_referer"):
164 data["HTTP_REFERER"] = data.get("HTTP_REFERER", "") or data.get(
164 data["HTTP_REFERER"] = data.get("HTTP_REFERER", "") or data.get(
165 "http_referer", ""
165 "http_referer", ""
166 )
166 )
167
167
168 self.resource_id = resource.resource_id
168 self.resource_id = resource.resource_id
169 self.username = data["username"]
169 self.username = data["username"]
170 self.user_agent = data["user_agent"]
170 self.user_agent = data["user_agent"]
171 self.ip = data["ip"]
171 self.ip = data["ip"]
172 self.extra = {}
172 self.extra = {}
173 if data.get("extra"):
173 if data.get("extra"):
174 for extra_tuple in data["extra"]:
174 for extra_tuple in data["extra"]:
175 self.extra[extra_tuple[0]] = extra_tuple[1]
175 self.extra[extra_tuple[0]] = extra_tuple[1]
176
176
177 self.url = data["url"]
177 self.url = data["url"]
178 self.request_id = data.get("request_id", "").replace("-", "") or str(
178 self.request_id = data.get("request_id", "").replace("-", "") or str(
179 uuid.uuid4()
179 uuid.uuid4()
180 )
180 )
181 request_data = data.get("request", {})
181 request_data = data.get("request", {})
182
182
183 self.request = request_data
183 self.request = request_data
184 self.request_stats = data.get("request_stats") or {}
184 self.request_stats = data.get("request_stats") or {}
185 traceback = data.get("traceback")
185 traceback = data.get("traceback")
186 if not traceback:
186 if not traceback:
187 traceback = data.get("frameinfo")
187 traceback = data.get("frameinfo")
188 self.traceback = traceback
188 self.traceback = traceback
189 start_date = convert_date(data.get("start_time"))
189 start_date = convert_date(data.get("start_time"))
190 if not self.start_time or self.start_time < start_date:
190 if not self.start_time or self.start_time < start_date:
191 self.start_time = start_date
191 self.start_time = start_date
192
192
193 self.end_time = convert_date(data.get("end_time"), False)
193 self.end_time = convert_date(data.get("end_time"), False)
194 self.duration = 0
194 self.duration = 0
195
195
196 if self.start_time and self.end_time:
196 if self.start_time and self.end_time:
197 d = self.end_time - self.start_time
197 d = self.end_time - self.start_time
198 self.duration = d.total_seconds()
198 self.duration = d.total_seconds()
199
199
200 # update tags with other vars
200 # update tags with other vars
201 if self.username:
201 if self.username:
202 self.tags["user_name"] = self.username
202 self.tags["user_name"] = self.username
203 self.tags["report_language"] = Language.key_from_value(self.language)
203 self.tags["report_language"] = Language.key_from_value(self.language)
204
204
205 def add_slow_calls(self, data, report_group):
205 def add_slow_calls(self, data, report_group):
206 slow_calls = []
206 slow_calls = []
207 for call in data.get("slow_calls", []):
207 for call in data.get("slow_calls", []):
208 sc_inst = SlowCall()
208 sc_inst = SlowCall()
209 sc_inst.set_data(
209 sc_inst.set_data(
210 call, resource_id=self.resource_id, report_group=report_group
210 call, resource_id=self.resource_id, report_group=report_group
211 )
211 )
212 slow_calls.append(sc_inst)
212 slow_calls.append(sc_inst)
213 self.slow_calls.extend(slow_calls)
213 self.slow_calls.extend(slow_calls)
214 return slow_calls
214 return slow_calls
215
215
216 def get_dict(self, request, details=False, exclude_keys=None, include_keys=None):
216 def get_dict(self, request, details=False, exclude_keys=None, include_keys=None):
217 from appenlight.models.services.report_group import ReportGroupService
217 from appenlight.models.services.report_group import ReportGroupService
218
218
219 instance_dict = super(Report, self).get_dict()
219 instance_dict = super(Report, self).get_dict()
220 instance_dict["req_stats"] = self.req_stats()
220 instance_dict["req_stats"] = self.req_stats()
221 instance_dict["group"] = {}
221 instance_dict["group"] = {}
222 instance_dict["group"]["id"] = self.report_group.id
222 instance_dict["group"]["id"] = self.report_group.id
223 instance_dict["group"]["total_reports"] = self.report_group.total_reports
223 instance_dict["group"]["total_reports"] = self.report_group.total_reports
224 instance_dict["group"]["last_report"] = self.report_group.last_report
224 instance_dict["group"]["last_report"] = self.report_group.last_report
225 instance_dict["group"]["priority"] = self.report_group.priority
225 instance_dict["group"]["priority"] = self.report_group.priority
226 instance_dict["group"]["occurences"] = self.report_group.occurences
226 instance_dict["group"]["occurences"] = self.report_group.occurences
227 instance_dict["group"]["last_timestamp"] = self.report_group.last_timestamp
227 instance_dict["group"]["last_timestamp"] = self.report_group.last_timestamp
228 instance_dict["group"]["first_timestamp"] = self.report_group.first_timestamp
228 instance_dict["group"]["first_timestamp"] = self.report_group.first_timestamp
229 instance_dict["group"]["public"] = self.report_group.public
229 instance_dict["group"]["public"] = self.report_group.public
230 instance_dict["group"]["fixed"] = self.report_group.fixed
230 instance_dict["group"]["fixed"] = self.report_group.fixed
231 instance_dict["group"]["read"] = self.report_group.read
231 instance_dict["group"]["read"] = self.report_group.read
232 instance_dict["group"]["average_duration"] = self.report_group.average_duration
232 instance_dict["group"]["average_duration"] = self.report_group.average_duration
233
233
234 instance_dict["resource_name"] = self.report_group.application.resource_name
234 instance_dict["resource_name"] = self.report_group.application.resource_name
235 instance_dict["report_type"] = self.report_type
235 instance_dict["report_type"] = self.report_type
236
236
237 if instance_dict["http_status"] == 404 and not instance_dict["error"]:
237 if instance_dict["http_status"] == 404 and not instance_dict["error"]:
238 instance_dict["error"] = "404 Not Found"
238 instance_dict["error"] = "404 Not Found"
239
239
240 if details:
240 if details:
241 instance_dict[
241 instance_dict[
242 "affected_users_count"
242 "affected_users_count"
243 ] = ReportGroupService.affected_users_count(self.report_group)
243 ] = ReportGroupService.affected_users_count(self.report_group)
244 instance_dict["top_affected_users"] = [
244 instance_dict["top_affected_users"] = [
245 {"username": u.username, "count": u.count}
245 {"username": u.username, "count": u.count}
246 for u in ReportGroupService.top_affected_users(self.report_group)
246 for u in ReportGroupService.top_affected_users(self.report_group)
247 ]
247 ]
248 instance_dict["application"] = {"integrations": []}
248 instance_dict["application"] = {"integrations": []}
249 for integration in self.report_group.application.integrations:
249 for integration in self.report_group.application.integrations:
250 if integration.front_visible:
250 if integration.front_visible:
251 instance_dict["application"]["integrations"].append(
251 instance_dict["application"]["integrations"].append(
252 {
252 {
253 "name": integration.integration_name,
253 "name": integration.integration_name,
254 "action": integration.integration_action,
254 "action": integration.integration_action,
255 }
255 }
256 )
256 )
257 instance_dict["comments"] = [
257 instance_dict["comments"] = [
258 c.get_dict() for c in self.report_group.comments
258 c.get_dict() for c in self.report_group.comments
259 ]
259 ]
260
260
261 instance_dict["group"]["next_report"] = None
261 instance_dict["group"]["next_report"] = None
262 instance_dict["group"]["previous_report"] = None
262 instance_dict["group"]["previous_report"] = None
263 next_in_group = self.get_next_in_group(request)
263 next_in_group = self.get_next_in_group(request)
264 previous_in_group = self.get_previous_in_group(request)
264 previous_in_group = self.get_previous_in_group(request)
265 if next_in_group:
265 if next_in_group:
266 instance_dict["group"]["next_report"] = next_in_group
266 instance_dict["group"]["next_report"] = next_in_group
267 if previous_in_group:
267 if previous_in_group:
268 instance_dict["group"]["previous_report"] = previous_in_group
268 instance_dict["group"]["previous_report"] = previous_in_group
269
269
270 # slow call ordering
270 # slow call ordering
271 def find_parent(row, data):
271 def find_parent(row, data):
272 for r in reversed(data):
272 for r in reversed(data):
273 try:
273 try:
274 if (
274 if (
275 row["timestamp"] > r["timestamp"]
275 row["timestamp"] > r["timestamp"]
276 and row["end_time"] < r["end_time"]
276 and row["end_time"] < r["end_time"]
277 ):
277 ):
278 return r
278 return r
279 except TypeError as e:
279 except TypeError as e:
280 log.warning("reports_view.find_parent: %s" % e)
280 log.warning("reports_view.find_parent: %s" % e)
281 return None
281 return None
282
282
283 new_calls = []
283 new_calls = []
284 calls = [c.get_dict() for c in self.slow_calls]
284 calls = [c.get_dict() for c in self.slow_calls]
285 while calls:
285 while calls:
286 # start from end
286 # start from end
287 for x in range(len(calls) - 1, -1, -1):
287 for x in range(len(calls) - 1, -1, -1):
288 parent = find_parent(calls[x], calls)
288 parent = find_parent(calls[x], calls)
289 if parent:
289 if parent:
290 parent["children"].append(calls[x])
290 parent["children"].append(calls[x])
291 else:
291 else:
292 # no parent at all? append to new calls anyways
292 # no parent at all? append to new calls anyways
293 new_calls.append(calls[x])
293 new_calls.append(calls[x])
294 # print 'append', calls[x]
294 # print 'append', calls[x]
295 del calls[x]
295 del calls[x]
296 break
296 break
297 instance_dict["slow_calls"] = new_calls
297 instance_dict["slow_calls"] = new_calls
298
298
299 instance_dict["front_url"] = self.get_public_url(request)
299 instance_dict["front_url"] = self.get_public_url(request)
300
300
301 exclude_keys_list = exclude_keys or []
301 exclude_keys_list = exclude_keys or []
302 include_keys_list = include_keys or []
302 include_keys_list = include_keys or []
303 for k in list(instance_dict.keys()):
303 for k in list(instance_dict.keys()):
304 if k == "group":
304 if k == "group":
305 continue
305 continue
306 if k in exclude_keys_list or (k not in include_keys_list and include_keys):
306 if k in exclude_keys_list or (k not in include_keys_list and include_keys):
307 del instance_dict[k]
307 del instance_dict[k]
308 return instance_dict
308 return instance_dict
309
309
310 def get_previous_in_group(self, request):
310 def get_previous_in_group(self, request):
311 query = {
311 query = {
312 "size": 1,
312 "size": 1,
313 "query": {
313 "query": {
314 "bool": {
314 "bool": {
315 "filter": [
315 "filter": [
316 {"term": {"group_id": self.group_id}},
316 {"term": {"group_id": self.group_id}},
317 {"range": {"pg_id": {"lt": self.id}}},
317 {"range": {"report_id": {"lt": self.id}}},
318 ]
318 ]
319 }
319 }
320 },
320 },
321 "sort": [{"_doc": {"order": "desc"}}],
321 "sort": [{"_doc": {"order": "desc"}}],
322 }
322 }
323 result = request.es_conn.search(
323 result = request.es_conn.search(
324 body=query, index=self.partition_id, doc_type="report"
324 body=query, index=self.partition_id, doc_type="report"
325 )
325 )
326 if result["hits"]["total"]:
326 if result["hits"]["total"]:
327 return result["hits"]["hits"][0]["_source"]["pg_id"]
327 return result["hits"]["hits"][0]["_source"]["report_id"]
328
328
329 def get_next_in_group(self, request):
329 def get_next_in_group(self, request):
330 query = {
330 query = {
331 "size": 1,
331 "size": 1,
332 "query": {
332 "query": {
333 "bool": {
333 "bool": {
334 "filter": [
334 "filter": [
335 {"term": {"group_id": self.group_id}},
335 {"term": {"group_id": self.group_id}},
336 {"range": {"pg_id": {"gt": self.id}}},
336 {"range": {"report_id": {"gt": self.id}}},
337 ]
337 ]
338 }
338 }
339 },
339 },
340 "sort": [{"_doc": {"order": "asc"}}],
340 "sort": [{"_doc": {"order": "asc"}}],
341 }
341 }
342 result = request.es_conn.search(
342 result = request.es_conn.search(
343 body=query, index=self.partition_id, doc_type="report"
343 body=query, index=self.partition_id, doc_type="report"
344 )
344 )
345 if result["hits"]["total"]:
345 if result["hits"]["total"]:
346 return result["hits"]["hits"][0]["_source"]["pg_id"]
346 return result["hits"]["hits"][0]["_source"]["report_id"]
347
347
348 def get_public_url(self, request=None, report_group=None, _app_url=None):
348 def get_public_url(self, request=None, report_group=None, _app_url=None):
349 """
349 """
350 Returns url that user can use to visit specific report
350 Returns url that user can use to visit specific report
351 """
351 """
352 if not request:
352 if not request:
353 request = get_current_request()
353 request = get_current_request()
354 url = request.route_url("/", _app_url=_app_url)
354 url = request.route_url("/", _app_url=_app_url)
355 if report_group:
355 if report_group:
356 return (url + "ui/report/%s/%s") % (report_group.id, self.id)
356 return (url + "ui/report/%s/%s") % (report_group.id, self.id)
357 return (url + "ui/report/%s/%s") % (self.group_id, self.id)
357 return (url + "ui/report/%s/%s") % (self.group_id, self.id)
358
358
359 def req_stats(self):
359 def req_stats(self):
360 stats = self.request_stats.copy()
360 stats = self.request_stats.copy()
361 stats["percentages"] = {}
361 stats["percentages"] = {}
362 stats["percentages"]["main"] = 100.0
362 stats["percentages"]["main"] = 100.0
363 main = stats.get("main", 0.0)
363 main = stats.get("main", 0.0)
364 if not main:
364 if not main:
365 return None
365 return None
366 for name, call_time in stats.items():
366 for name, call_time in stats.items():
367 if "calls" not in name and "main" not in name and "percentages" not in name:
367 if "calls" not in name and "main" not in name and "percentages" not in name:
368 stats["main"] -= call_time
368 stats["main"] -= call_time
369 stats["percentages"][name] = math.floor((call_time / main * 100.0))
369 stats["percentages"][name] = math.floor((call_time / main * 100.0))
370 stats["percentages"]["main"] -= stats["percentages"][name]
370 stats["percentages"]["main"] -= stats["percentages"][name]
371 if stats["percentages"]["main"] < 0.0:
371 if stats["percentages"]["main"] < 0.0:
372 stats["percentages"]["main"] = 0.0
372 stats["percentages"]["main"] = 0.0
373 stats["main"] = 0.0
373 stats["main"] = 0.0
374 return stats
374 return stats
375
375
376 def generate_grouping_hash(
376 def generate_grouping_hash(
377 self, hash_string=None, default_grouping=None, protocol_version=None
377 self, hash_string=None, default_grouping=None, protocol_version=None
378 ):
378 ):
379 """
379 """
380 Generates SHA1 hash that will be used to group reports together
380 Generates SHA1 hash that will be used to group reports together
381 """
381 """
382 if not hash_string:
382 if not hash_string:
383 location = self.tags.get("view_name") or self.url_path
383 location = self.tags.get("view_name") or self.url_path
384 server_name = self.tags.get("server_name") or ""
384 server_name = self.tags.get("server_name") or ""
385 if default_grouping == "url_traceback":
385 if default_grouping == "url_traceback":
386 hash_string = "%s_%s_%s" % (self.traceback_hash, location, self.error)
386 hash_string = "%s_%s_%s" % (self.traceback_hash, location, self.error)
387 if self.language == Language.javascript:
387 if self.language == Language.javascript:
388 hash_string = "%s_%s" % (self.traceback_hash, self.error)
388 hash_string = "%s_%s" % (self.traceback_hash, self.error)
389
389
390 elif default_grouping == "traceback_server":
390 elif default_grouping == "traceback_server":
391 hash_string = "%s_%s" % (self.traceback_hash, server_name)
391 hash_string = "%s_%s" % (self.traceback_hash, server_name)
392 if self.language == Language.javascript:
392 if self.language == Language.javascript:
393 hash_string = "%s_%s" % (self.traceback_hash, server_name)
393 hash_string = "%s_%s" % (self.traceback_hash, server_name)
394 else:
394 else:
395 hash_string = "%s_%s" % (self.error, location)
395 hash_string = "%s_%s" % (self.error, location)
396 month = datetime.utcnow().date().replace(day=1)
396 month = datetime.utcnow().date().replace(day=1)
397 hash_string = "{}_{}".format(month, hash_string)
397 hash_string = "{}_{}".format(month, hash_string)
398 binary_string = hash_string.encode("utf8")
398 binary_string = hash_string.encode("utf8")
399 self.grouping_hash = hashlib.sha1(binary_string).hexdigest()
399 self.grouping_hash = hashlib.sha1(binary_string).hexdigest()
400 return self.grouping_hash
400 return self.grouping_hash
401
401
402 def stripped_traceback(self):
402 def stripped_traceback(self):
403 """
403 """
404 Traceback without local vars
404 Traceback without local vars
405 """
405 """
406 stripped_traceback = copy.deepcopy(self.traceback)
406 stripped_traceback = copy.deepcopy(self.traceback)
407
407
408 if isinstance(stripped_traceback, list):
408 if isinstance(stripped_traceback, list):
409 for row in stripped_traceback:
409 for row in stripped_traceback:
410 row.pop("vars", None)
410 row.pop("vars", None)
411 return stripped_traceback
411 return stripped_traceback
412
412
413 def notify_channel(self, report_group):
413 def notify_channel(self, report_group):
414 """
414 """
415 Sends notification to websocket channel
415 Sends notification to websocket channel
416 """
416 """
417 settings = get_current_registry().settings
417 settings = get_current_registry().settings
418 log.info("notify channelstream")
418 log.info("notify channelstream")
419 if self.report_type != ReportType.error:
419 if self.report_type != ReportType.error:
420 return
420 return
421 payload = {
421 payload = {
422 "type": "message",
422 "type": "message",
423 "user": "__system__",
423 "user": "__system__",
424 "channel": "app_%s" % self.resource_id,
424 "channel": "app_%s" % self.resource_id,
425 "message": {
425 "message": {
426 "topic": "front_dashboard.new_topic",
426 "topic": "front_dashboard.new_topic",
427 "report": {
427 "report": {
428 "group": {
428 "group": {
429 "priority": report_group.priority,
429 "priority": report_group.priority,
430 "first_timestamp": report_group.first_timestamp,
430 "first_timestamp": report_group.first_timestamp,
431 "last_timestamp": report_group.last_timestamp,
431 "last_timestamp": report_group.last_timestamp,
432 "average_duration": report_group.average_duration,
432 "average_duration": report_group.average_duration,
433 "occurences": report_group.occurences,
433 "occurences": report_group.occurences,
434 },
434 },
435 "report_id": self.id,
435 "report_id": self.id,
436 "group_id": self.group_id,
436 "group_id": self.group_id,
437 "resource_id": self.resource_id,
437 "resource_id": self.resource_id,
438 "http_status": self.http_status,
438 "http_status": self.http_status,
439 "url_domain": self.url_domain,
439 "url_domain": self.url_domain,
440 "url_path": self.url_path,
440 "url_path": self.url_path,
441 "error": self.error or "",
441 "error": self.error or "",
442 "server": self.tags.get("server_name"),
442 "server": self.tags.get("server_name"),
443 "view_name": self.tags.get("view_name"),
443 "view_name": self.tags.get("view_name"),
444 "front_url": self.get_public_url(),
444 "front_url": self.get_public_url(),
445 },
445 },
446 },
446 },
447 }
447 }
448 channelstream_request(
448 channelstream_request(
449 settings["cometd.secret"],
449 settings["cometd.secret"],
450 "/message",
450 "/message",
451 [payload],
451 [payload],
452 servers=[settings["cometd_servers"]],
452 servers=[settings["cometd_servers"]],
453 )
453 )
454
454
455 def es_doc(self):
455 def es_doc(self):
456 tags = {}
456 tags = {}
457 tag_list = []
457 tag_list = []
458 for name, value in self.tags.items():
458 for name, value in self.tags.items():
459 name = name.replace(".", "_")
459 name = name.replace(".", "_")
460 tag_list.append(name)
460 tag_list.append(name)
461 tags[name] = {
461 tags[name] = {
462 "values": convert_es_type(value),
462 "values": convert_es_type(value),
463 "numeric_values": value
463 "numeric_values": value
464 if (isinstance(value, (int, float)) and not isinstance(value, bool))
464 if (isinstance(value, (int, float)) and not isinstance(value, bool))
465 else None,
465 else None,
466 }
466 }
467
467
468 if "user_name" not in self.tags and self.username:
468 if "user_name" not in self.tags and self.username:
469 tags["user_name"] = {"value": [self.username], "numeric_value": None}
469 tags["user_name"] = {"value": [self.username], "numeric_value": None}
470 return {
470 return {
471 "_id": str(self.id),
471 "_id": str(self.id),
472 "pg_id": str(self.id),
472 "report_id": str(self.id),
473 "resource_id": self.resource_id,
473 "resource_id": self.resource_id,
474 "http_status": self.http_status or "",
474 "http_status": self.http_status or "",
475 "start_time": self.start_time,
475 "start_time": self.start_time,
476 "end_time": self.end_time,
476 "end_time": self.end_time,
477 "url_domain": self.url_domain if self.url_domain else "",
477 "url_domain": self.url_domain if self.url_domain else "",
478 "url_path": self.url_path if self.url_path else "",
478 "url_path": self.url_path if self.url_path else "",
479 "duration": self.duration,
479 "duration": self.duration,
480 "error": self.error if self.error else "",
480 "error": self.error if self.error else "",
481 "report_type": self.report_type,
481 "report_type": self.report_type,
482 "request_id": self.request_id,
482 "request_id": self.request_id,
483 "ip": self.ip,
483 "ip": self.ip,
484 "group_id": str(self.group_id),
484 "group_id": str(self.group_id),
485 "_parent": str(self.group_id),
485 "type": "report",
486 "join_field": {
487 "name": "report",
488 "parent": str(self.group_id)
489 },
486 "tags": tags,
490 "tags": tags,
487 "tag_list": tag_list,
491 "tag_list": tag_list,
492 "_routing": str(self.group_id)
488 }
493 }
489
494
490 @property
495 @property
491 def partition_id(self):
496 def partition_id(self):
492 return "rcae_r_%s" % self.report_group_time.strftime("%Y_%m")
497 return "rcae_r_%s" % self.report_group_time.strftime("%Y_%m")
493
498
494 def partition_range(self):
499 def partition_range(self):
495 start_date = self.report_group_time.date().replace(day=1)
500 start_date = self.report_group_time.date().replace(day=1)
496 end_date = start_date + timedelta(days=40)
501 end_date = start_date + timedelta(days=40)
497 end_date = end_date.replace(day=1)
502 end_date = end_date.replace(day=1)
498 return start_date, end_date
503 return start_date, end_date
499
504
500
505
501 def after_insert(mapper, connection, target):
506 def after_insert(mapper, connection, target):
502 if not hasattr(target, "_skip_ft_index"):
507 if not hasattr(target, "_skip_ft_index"):
503 data = target.es_doc()
508 data = target.es_doc()
504 data.pop("_id", None)
509 data.pop("_id", None)
505 Datastores.es.index(
510 Datastores.es.index(
506 target.partition_id, "report", data, parent=target.group_id, id=target.id
511 target.partition_id, "report", data, parent=target.group_id, id=target.id
507 )
512 )
508
513
509
514
510 def after_update(mapper, connection, target):
515 def after_update(mapper, connection, target):
511 if not hasattr(target, "_skip_ft_index"):
516 if not hasattr(target, "_skip_ft_index"):
512 data = target.es_doc()
517 data = target.es_doc()
513 data.pop("_id", None)
518 data.pop("_id", None)
514 Datastores.es.index(
519 Datastores.es.index(
515 target.partition_id, "report", data, parent=target.group_id, id=target.id
520 target.partition_id, "report", data, parent=target.group_id, id=target.id
516 )
521 )
517
522
518
523
519 def after_delete(mapper, connection, target):
524 def after_delete(mapper, connection, target):
520 if not hasattr(target, "_skip_ft_index"):
525 if not hasattr(target, "_skip_ft_index"):
521 query = {"query": {"term": {"pg_id": target.id}}}
526 query = {"query": {"term": {"report_id": target.id}}}
522 Datastores.es.delete_by_query(
527 Datastores.es.delete_by_query(
523 index=target.partition_id, doc_type="report", body=query, conflicts="proceed"
528 index=target.partition_id, doc_type="report", body=query, conflicts="proceed"
524 )
529 )
525
530
526
531
527 sa.event.listen(Report, "after_insert", after_insert)
532 sa.event.listen(Report, "after_insert", after_insert)
528 sa.event.listen(Report, "after_update", after_update)
533 sa.event.listen(Report, "after_update", after_update)
529 sa.event.listen(Report, "after_delete", after_delete)
534 sa.event.listen(Report, "after_delete", after_delete)
@@ -1,285 +1,285 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors
3 # Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors
4 #
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
7 # You may obtain a copy of the License at
8 #
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
10 #
11 # Unless required by applicable law or agreed to in writing, software
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
15 # limitations under the License.
16
16
17 import logging
17 import logging
18 import sqlalchemy as sa
18 import sqlalchemy as sa
19
19
20 from datetime import datetime, timedelta
20 from datetime import datetime, timedelta
21
21
22 from pyramid.threadlocal import get_current_request
22 from pyramid.threadlocal import get_current_request
23 from sqlalchemy.dialects.postgresql import JSON
23 from sqlalchemy.dialects.postgresql import JSON
24 from ziggurat_foundations.models.base import BaseModel
24 from ziggurat_foundations.models.base import BaseModel
25
25
26 from appenlight.models import Base, get_db_session, Datastores
26 from appenlight.models import Base, get_db_session, Datastores
27 from appenlight.lib.enums import ReportType
27 from appenlight.lib.enums import ReportType
28 from appenlight.lib.rule import Rule
28 from appenlight.lib.rule import Rule
29 from appenlight.lib.redis_keys import REDIS_KEYS
29 from appenlight.lib.redis_keys import REDIS_KEYS
30 from appenlight.models.report import REPORT_TYPE_MATRIX
30 from appenlight.models.report import REPORT_TYPE_MATRIX
31
31
32 log = logging.getLogger(__name__)
32 log = logging.getLogger(__name__)
33
33
34
34
35 class ReportGroup(Base, BaseModel):
35 class ReportGroup(Base, BaseModel):
36 __tablename__ = "reports_groups"
36 __tablename__ = "reports_groups"
37 __table_args__ = {"implicit_returning": False}
37 __table_args__ = {"implicit_returning": False}
38
38
39 id = sa.Column(sa.BigInteger(), nullable=False, primary_key=True)
39 id = sa.Column(sa.BigInteger(), nullable=False, primary_key=True)
40 resource_id = sa.Column(
40 resource_id = sa.Column(
41 sa.Integer(),
41 sa.Integer(),
42 sa.ForeignKey(
42 sa.ForeignKey(
43 "applications.resource_id", onupdate="CASCADE", ondelete="CASCADE"
43 "applications.resource_id", onupdate="CASCADE", ondelete="CASCADE"
44 ),
44 ),
45 nullable=False,
45 nullable=False,
46 index=True,
46 index=True,
47 )
47 )
48 priority = sa.Column(
48 priority = sa.Column(
49 sa.Integer, nullable=False, index=True, default=5, server_default="5"
49 sa.Integer, nullable=False, index=True, default=5, server_default="5"
50 )
50 )
51 first_timestamp = sa.Column(
51 first_timestamp = sa.Column(
52 sa.DateTime(), default=datetime.utcnow, server_default=sa.func.now()
52 sa.DateTime(), default=datetime.utcnow, server_default=sa.func.now()
53 )
53 )
54 last_timestamp = sa.Column(
54 last_timestamp = sa.Column(
55 sa.DateTime(), default=datetime.utcnow, server_default=sa.func.now()
55 sa.DateTime(), default=datetime.utcnow, server_default=sa.func.now()
56 )
56 )
57 error = sa.Column(sa.UnicodeText(), index=True)
57 error = sa.Column(sa.UnicodeText(), index=True)
58 grouping_hash = sa.Column(sa.String(40), default="")
58 grouping_hash = sa.Column(sa.String(40), default="")
59 triggered_postprocesses_ids = sa.Column(JSON(), nullable=False, default=list)
59 triggered_postprocesses_ids = sa.Column(JSON(), nullable=False, default=list)
60 report_type = sa.Column(sa.Integer, default=1)
60 report_type = sa.Column(sa.Integer, default=1)
61 total_reports = sa.Column(sa.Integer, default=1)
61 total_reports = sa.Column(sa.Integer, default=1)
62 last_report = sa.Column(sa.Integer)
62 last_report = sa.Column(sa.Integer)
63 occurences = sa.Column(sa.Integer, default=1)
63 occurences = sa.Column(sa.Integer, default=1)
64 average_duration = sa.Column(sa.Float, default=0)
64 average_duration = sa.Column(sa.Float, default=0)
65 summed_duration = sa.Column(sa.Float, default=0)
65 summed_duration = sa.Column(sa.Float, default=0)
66 read = sa.Column(sa.Boolean(), index=True, default=False)
66 read = sa.Column(sa.Boolean(), index=True, default=False)
67 fixed = sa.Column(sa.Boolean(), index=True, default=False)
67 fixed = sa.Column(sa.Boolean(), index=True, default=False)
68 notified = sa.Column(sa.Boolean(), index=True, default=False)
68 notified = sa.Column(sa.Boolean(), index=True, default=False)
69 public = sa.Column(sa.Boolean(), index=True, default=False)
69 public = sa.Column(sa.Boolean(), index=True, default=False)
70
70
71 reports = sa.orm.relationship(
71 reports = sa.orm.relationship(
72 "Report",
72 "Report",
73 lazy="dynamic",
73 lazy="dynamic",
74 backref="report_group",
74 backref="report_group",
75 cascade="all, delete-orphan",
75 cascade="all, delete-orphan",
76 passive_deletes=True,
76 passive_deletes=True,
77 passive_updates=True,
77 passive_updates=True,
78 )
78 )
79
79
80 comments = sa.orm.relationship(
80 comments = sa.orm.relationship(
81 "ReportComment",
81 "ReportComment",
82 lazy="dynamic",
82 lazy="dynamic",
83 backref="report",
83 backref="report",
84 cascade="all, delete-orphan",
84 cascade="all, delete-orphan",
85 passive_deletes=True,
85 passive_deletes=True,
86 passive_updates=True,
86 passive_updates=True,
87 order_by="ReportComment.comment_id",
87 order_by="ReportComment.comment_id",
88 )
88 )
89
89
90 assigned_users = sa.orm.relationship(
90 assigned_users = sa.orm.relationship(
91 "User",
91 "User",
92 backref=sa.orm.backref(
92 backref=sa.orm.backref(
93 "assigned_reports_relation",
93 "assigned_reports_relation",
94 lazy="dynamic",
94 lazy="dynamic",
95 order_by=sa.desc(sa.text("reports_groups.id")),
95 order_by=sa.desc(sa.text("reports_groups.id")),
96 ),
96 ),
97 passive_deletes=True,
97 passive_deletes=True,
98 passive_updates=True,
98 passive_updates=True,
99 secondary="reports_assignments",
99 secondary="reports_assignments",
100 order_by="User.user_name",
100 order_by="User.user_name",
101 )
101 )
102
102
103 stats = sa.orm.relationship(
103 stats = sa.orm.relationship(
104 "ReportStat",
104 "ReportStat",
105 lazy="dynamic",
105 lazy="dynamic",
106 backref="report",
106 backref="report",
107 passive_deletes=True,
107 passive_deletes=True,
108 passive_updates=True,
108 passive_updates=True,
109 )
109 )
110
110
111 last_report_ref = sa.orm.relationship(
111 last_report_ref = sa.orm.relationship(
112 "Report",
112 "Report",
113 uselist=False,
113 uselist=False,
114 primaryjoin="ReportGroup.last_report " "== Report.id",
114 primaryjoin="ReportGroup.last_report " "== Report.id",
115 foreign_keys="Report.id",
115 foreign_keys="Report.id",
116 cascade="all, delete-orphan",
116 cascade="all, delete-orphan",
117 passive_deletes=True,
117 passive_deletes=True,
118 passive_updates=True,
118 passive_updates=True,
119 )
119 )
120
120
121 def __repr__(self):
121 def __repr__(self):
122 return "<ReportGroup id:{}>".format(self.id)
122 return "<ReportGroup id:{}>".format(self.id)
123
123
124 def get_report(self, report_id=None, public=False):
124 def get_report(self, report_id=None, public=False):
125 """
125 """
126 Gets report with specific id or latest report if id was not specified
126 Gets report with specific id or latest report if id was not specified
127 """
127 """
128 from .report import Report
128 from .report import Report
129
129
130 if not report_id:
130 if not report_id:
131 return self.last_report_ref
131 return self.last_report_ref
132 else:
132 else:
133 return self.reports.filter(Report.id == report_id).first()
133 return self.reports.filter(Report.id == report_id).first()
134
134
135 def get_public_url(self, request, _app_url=None):
135 def get_public_url(self, request, _app_url=None):
136 url = request.route_url("/", _app_url=_app_url)
136 url = request.route_url("/", _app_url=_app_url)
137 return (url + "ui/report/%s") % self.id
137 return (url + "ui/report/%s") % self.id
138
138
139 def run_postprocessing(self, report):
139 def run_postprocessing(self, report):
140 """
140 """
141 Alters report group priority based on postprocessing configuration
141 Alters report group priority based on postprocessing configuration
142 """
142 """
143 request = get_current_request()
143 request = get_current_request()
144 get_db_session(None, self).flush()
144 get_db_session(None, self).flush()
145 for action in self.application.postprocess_conf:
145 for action in self.application.postprocess_conf:
146 get_db_session(None, self).flush()
146 get_db_session(None, self).flush()
147 rule_obj = Rule(action.rule, REPORT_TYPE_MATRIX)
147 rule_obj = Rule(action.rule, REPORT_TYPE_MATRIX)
148 report_dict = report.get_dict(request)
148 report_dict = report.get_dict(request)
149 # if was not processed yet
149 # if was not processed yet
150 if (
150 if (
151 rule_obj.match(report_dict)
151 rule_obj.match(report_dict)
152 and action.pkey not in self.triggered_postprocesses_ids
152 and action.pkey not in self.triggered_postprocesses_ids
153 ):
153 ):
154 action.postprocess(self)
154 action.postprocess(self)
155 # this way sqla can track mutation of list
155 # this way sqla can track mutation of list
156 self.triggered_postprocesses_ids = self.triggered_postprocesses_ids + [
156 self.triggered_postprocesses_ids = self.triggered_postprocesses_ids + [
157 action.pkey
157 action.pkey
158 ]
158 ]
159
159
160 get_db_session(None, self).flush()
160 get_db_session(None, self).flush()
161 # do not go out of bounds
161 # do not go out of bounds
162 if self.priority < 1:
162 if self.priority < 1:
163 self.priority = 1
163 self.priority = 1
164 if self.priority > 10:
164 if self.priority > 10:
165 self.priority = 10
165 self.priority = 10
166
166
167 def get_dict(self, request):
167 def get_dict(self, request):
168 instance_dict = super(ReportGroup, self).get_dict()
168 instance_dict = super(ReportGroup, self).get_dict()
169 instance_dict["server_name"] = self.get_report().tags.get("server_name")
169 instance_dict["server_name"] = self.get_report().tags.get("server_name")
170 instance_dict["view_name"] = self.get_report().tags.get("view_name")
170 instance_dict["view_name"] = self.get_report().tags.get("view_name")
171 instance_dict["resource_name"] = self.application.resource_name
171 instance_dict["resource_name"] = self.application.resource_name
172 instance_dict["report_type"] = self.get_report().report_type
172 instance_dict["report_type"] = self.get_report().report_type
173 instance_dict["url_path"] = self.get_report().url_path
173 instance_dict["url_path"] = self.get_report().url_path
174 instance_dict["front_url"] = self.get_report().get_public_url(request)
174 instance_dict["front_url"] = self.get_report().get_public_url(request)
175 del instance_dict["triggered_postprocesses_ids"]
175 del instance_dict["triggered_postprocesses_ids"]
176 return instance_dict
176 return instance_dict
177
177
178 def es_doc(self):
178 def es_doc(self):
179 return {
179 return {
180 "_id": str(self.id),
180 "_id": str(self.id),
181 "pg_id": str(self.id),
181 "group_id": str(self.id),
182 "resource_id": self.resource_id,
182 "resource_id": self.resource_id,
183 "error": self.error,
183 "error": self.error,
184 "fixed": self.fixed,
184 "fixed": self.fixed,
185 "public": self.public,
185 "public": self.public,
186 "read": self.read,
186 "read": self.read,
187 "priority": self.priority,
187 "priority": self.priority,
188 "occurences": self.occurences,
188 "occurences": self.occurences,
189 "average_duration": self.average_duration,
189 "average_duration": self.average_duration,
190 "summed_duration": self.summed_duration,
190 "summed_duration": self.summed_duration,
191 "first_timestamp": self.first_timestamp,
191 "first_timestamp": self.first_timestamp,
192 "last_timestamp": self.last_timestamp,
192 "last_timestamp": self.last_timestamp,
193 "type": "report_group",
194 "join_field": {
195 "name": "report_group"
196 },
193 }
197 }
194
198
195 def set_notification_info(self, notify_10=False, notify_100=False):
199 def set_notification_info(self, notify_10=False, notify_100=False):
196 """
200 """
197 Update redis notification maps for notification job
201 Update redis notification maps for notification job
198 """
202 """
199 current_time = datetime.utcnow().replace(second=0, microsecond=0)
203 current_time = datetime.utcnow().replace(second=0, microsecond=0)
200 # global app counter
204 # global app counter
201 key = REDIS_KEYS["counters"]["reports_per_type"].format(
205 key = REDIS_KEYS["counters"]["reports_per_type"].format(
202 self.report_type, current_time
206 self.report_type, current_time
203 )
207 )
204 redis_pipeline = Datastores.redis.pipeline()
208 redis_pipeline = Datastores.redis.pipeline()
205 redis_pipeline.incr(key)
209 redis_pipeline.incr(key)
206 redis_pipeline.expire(key, 3600 * 24)
210 redis_pipeline.expire(key, 3600 * 24)
207 # detailed app notification for alerts and notifications
211 # detailed app notification for alerts and notifications
208 redis_pipeline.sadd(REDIS_KEYS["apps_that_had_reports"], self.resource_id)
212 redis_pipeline.sadd(REDIS_KEYS["apps_that_had_reports"], self.resource_id)
209 redis_pipeline.sadd(
213 redis_pipeline.sadd(
210 REDIS_KEYS["apps_that_had_reports_alerting"], self.resource_id
214 REDIS_KEYS["apps_that_had_reports_alerting"], self.resource_id
211 )
215 )
212 # only notify for exceptions here
216 # only notify for exceptions here
213 if self.report_type == ReportType.error:
217 if self.report_type == ReportType.error:
214 redis_pipeline.sadd(REDIS_KEYS["apps_that_had_reports"], self.resource_id)
218 redis_pipeline.sadd(REDIS_KEYS["apps_that_had_reports"], self.resource_id)
215 redis_pipeline.sadd(
219 redis_pipeline.sadd(
216 REDIS_KEYS["apps_that_had_error_reports_alerting"], self.resource_id
220 REDIS_KEYS["apps_that_had_error_reports_alerting"], self.resource_id
217 )
221 )
218 key = REDIS_KEYS["counters"]["report_group_occurences"].format(self.id)
222 key = REDIS_KEYS["counters"]["report_group_occurences"].format(self.id)
219 redis_pipeline.incr(key)
223 redis_pipeline.incr(key)
220 redis_pipeline.expire(key, 3600 * 24)
224 redis_pipeline.expire(key, 3600 * 24)
221 key = REDIS_KEYS["counters"]["report_group_occurences_alerting"].format(self.id)
225 key = REDIS_KEYS["counters"]["report_group_occurences_alerting"].format(self.id)
222 redis_pipeline.incr(key)
226 redis_pipeline.incr(key)
223 redis_pipeline.expire(key, 3600 * 24)
227 redis_pipeline.expire(key, 3600 * 24)
224
228
225 if notify_10:
229 if notify_10:
226 key = REDIS_KEYS["counters"]["report_group_occurences_10th"].format(self.id)
230 key = REDIS_KEYS["counters"]["report_group_occurences_10th"].format(self.id)
227 redis_pipeline.setex(key, 3600 * 24, 1)
231 redis_pipeline.setex(key, 3600 * 24, 1)
228 if notify_100:
232 if notify_100:
229 key = REDIS_KEYS["counters"]["report_group_occurences_100th"].format(
233 key = REDIS_KEYS["counters"]["report_group_occurences_100th"].format(
230 self.id
234 self.id
231 )
235 )
232 redis_pipeline.setex(key, 3600 * 24, 1)
236 redis_pipeline.setex(key, 3600 * 24, 1)
233
237
234 key = REDIS_KEYS["reports_to_notify_per_type_per_app"].format(
238 key = REDIS_KEYS["reports_to_notify_per_type_per_app"].format(
235 self.report_type, self.resource_id
239 self.report_type, self.resource_id
236 )
240 )
237 redis_pipeline.sadd(key, self.id)
241 redis_pipeline.sadd(key, self.id)
238 redis_pipeline.expire(key, 3600 * 24)
242 redis_pipeline.expire(key, 3600 * 24)
239 key = REDIS_KEYS["reports_to_notify_per_type_per_app_alerting"].format(
243 key = REDIS_KEYS["reports_to_notify_per_type_per_app_alerting"].format(
240 self.report_type, self.resource_id
244 self.report_type, self.resource_id
241 )
245 )
242 redis_pipeline.sadd(key, self.id)
246 redis_pipeline.sadd(key, self.id)
243 redis_pipeline.expire(key, 3600 * 24)
247 redis_pipeline.expire(key, 3600 * 24)
244 redis_pipeline.execute()
248 redis_pipeline.execute()
245
249
246 @property
250 @property
247 def partition_id(self):
251 def partition_id(self):
248 return "rcae_r_%s" % self.first_timestamp.strftime("%Y_%m")
252 return "rcae_r_%s" % self.first_timestamp.strftime("%Y_%m")
249
253
250 def partition_range(self):
254 def partition_range(self):
251 start_date = self.first_timestamp.date().replace(day=1)
255 start_date = self.first_timestamp.date().replace(day=1)
252 end_date = start_date + timedelta(days=40)
256 end_date = start_date + timedelta(days=40)
253 end_date = end_date.replace(day=1)
257 end_date = end_date.replace(day=1)
254 return start_date, end_date
258 return start_date, end_date
255
259
256
260
257 def after_insert(mapper, connection, target):
261 def after_insert(mapper, connection, target):
258 if not hasattr(target, "_skip_ft_index"):
262 if not hasattr(target, "_skip_ft_index"):
259 data = target.es_doc()
263 data = target.es_doc()
260 data.pop("_id", None)
264 data.pop("_id", None)
261 Datastores.es.index(target.partition_id, "report_group", data, id=target.id)
265 Datastores.es.index(target.partition_id, "report", data, id=target.id)
262
266
263
267
264 def after_update(mapper, connection, target):
268 def after_update(mapper, connection, target):
265 if not hasattr(target, "_skip_ft_index"):
269 if not hasattr(target, "_skip_ft_index"):
266 data = target.es_doc()
270 data = target.es_doc()
267 data.pop("_id", None)
271 data.pop("_id", None)
268 Datastores.es.index(target.partition_id, "report_group", data, id=target.id)
272 Datastores.es.index(target.partition_id, "report", data, id=target.id)
269
273
270
274
271 def after_delete(mapper, connection, target):
275 def after_delete(mapper, connection, target):
272 query = {"query": {"term": {"group_id": target.id}}}
276 query = {"query": {"term": {"group_id": target.id}}}
273 # delete by query
277 # delete by query
274 Datastores.es.delete_by_query(
278 Datastores.es.delete_by_query(
275 index=target.partition_id, doc_type="report", body=query, conflicts="proceed"
279 index=target.partition_id, doc_type="report", body=query, conflicts="proceed"
276 )
280 )
277 query = {"query": {"term": {"pg_id": target.id}}}
278 Datastores.es.delete_by_query(
279 index=target.partition_id, doc_type="report_group", body=query, conflicts="proceed"
280 )
281
281
282
282
283 sa.event.listen(ReportGroup, "after_insert", after_insert)
283 sa.event.listen(ReportGroup, "after_insert", after_insert)
284 sa.event.listen(ReportGroup, "after_update", after_update)
284 sa.event.listen(ReportGroup, "after_update", after_update)
285 sa.event.listen(ReportGroup, "after_delete", after_delete)
285 sa.event.listen(ReportGroup, "after_delete", after_delete)
@@ -1,79 +1,81 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors
3 # Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors
4 #
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
7 # You may obtain a copy of the License at
8 #
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
10 #
11 # Unless required by applicable law or agreed to in writing, software
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
15 # limitations under the License.
16
16
17 import sqlalchemy as sa
17 import sqlalchemy as sa
18
18
19 from appenlight.lib.enums import ReportType
19 from appenlight.lib.enums import ReportType
20 from appenlight.models import Base
20 from appenlight.models import Base
21 from ziggurat_foundations.models.base import BaseModel
21 from ziggurat_foundations.models.base import BaseModel
22
22
23
23
24 class ReportStat(Base, BaseModel):
24 class ReportStat(Base, BaseModel):
25 __tablename__ = "reports_stats"
25 __tablename__ = "reports_stats"
26 __table_args__ = {"implicit_returning": False}
26 __table_args__ = {"implicit_returning": False}
27
27
28 group_id = sa.Column(
28 group_id = sa.Column(
29 sa.BigInteger(), sa.ForeignKey("reports_groups.id"), nullable=False
29 sa.BigInteger(), sa.ForeignKey("reports_groups.id"), nullable=False
30 )
30 )
31 resource_id = sa.Column(
31 resource_id = sa.Column(
32 sa.Integer(), sa.ForeignKey("applications.resource_id"), nullable=False
32 sa.Integer(), sa.ForeignKey("applications.resource_id"), nullable=False
33 )
33 )
34 start_interval = sa.Column(sa.DateTime(), nullable=False)
34 start_interval = sa.Column(sa.DateTime(), nullable=False)
35 occurences = sa.Column(sa.Integer, nullable=True, default=0)
35 occurences = sa.Column(sa.Integer, nullable=True, default=0)
36 owner_user_id = sa.Column(sa.Integer(), sa.ForeignKey("users.id"), nullable=True)
36 owner_user_id = sa.Column(sa.Integer(), sa.ForeignKey("users.id"), nullable=True)
37 type = sa.Column(sa.Integer, nullable=True, default=0)
37 type = sa.Column(sa.Integer, nullable=True, default=0)
38 duration = sa.Column(sa.Float, nullable=True, default=0)
38 duration = sa.Column(sa.Float, nullable=True, default=0)
39 id = sa.Column(sa.BigInteger, nullable=False, primary_key=True)
39 id = sa.Column(sa.BigInteger, nullable=False, primary_key=True)
40 server_name = sa.Column(sa.Unicode(128), nullable=False, default="")
40 server_name = sa.Column(sa.Unicode(128), nullable=False, default="")
41 view_name = sa.Column(sa.Unicode(128), nullable=False, default="")
41 view_name = sa.Column(sa.Unicode(128), nullable=False, default="")
42
42
43 @property
43 @property
44 def partition_id(self):
44 def partition_id(self):
45 return "rcae_r_%s" % self.start_interval.strftime("%Y_%m")
45 return "rcae_r_%s" % self.start_interval.strftime("%Y_%m")
46
46
47 def es_doc(self):
47 def es_doc(self):
48 return {
48 return {
49 "resource_id": self.resource_id,
49 "resource_id": self.resource_id,
50 "timestamp": self.start_interval,
50 "timestamp": self.start_interval,
51 "pg_id": str(self.id),
51 "report_stat_id": str(self.id),
52 "permanent": True,
52 "permanent": True,
53 "request_id": None,
53 "request_id": None,
54 "log_level": "ERROR",
54 "log_level": "ERROR",
55 "message": None,
55 "message": None,
56 "namespace": "appenlight.error",
56 "namespace": "appenlight.error",
57 "group_id": str(self.group_id),
57 "tags": {
58 "tags": {
58 "duration": {"values": self.duration, "numeric_values": self.duration},
59 "duration": {"values": self.duration, "numeric_values": self.duration},
59 "occurences": {
60 "occurences": {
60 "values": self.occurences,
61 "values": self.occurences,
61 "numeric_values": self.occurences,
62 "numeric_values": self.occurences,
62 },
63 },
63 "group_id": {"values": self.group_id, "numeric_values": self.group_id},
64 "group_id": {"values": self.group_id, "numeric_values": self.group_id},
64 "type": {
65 "type": {
65 "values": ReportType.key_from_value(self.type),
66 "values": ReportType.key_from_value(self.type),
66 "numeric_values": self.type,
67 "numeric_values": self.type,
67 },
68 },
68 "server_name": {"values": self.server_name, "numeric_values": None},
69 "server_name": {"values": self.server_name, "numeric_values": None},
69 "view_name": {"values": self.view_name, "numeric_values": None},
70 "view_name": {"values": self.view_name, "numeric_values": None},
70 },
71 },
71 "tag_list": [
72 "tag_list": [
72 "duration",
73 "duration",
73 "occurences",
74 "occurences",
74 "group_id",
75 "group_id",
75 "type",
76 "type",
76 "server_name",
77 "server_name",
77 "view_name",
78 "view_name",
78 ],
79 ],
80 "type": "report_stat",
79 }
81 }
@@ -1,222 +1,222 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors
3 # Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors
4 #
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
7 # You may obtain a copy of the License at
8 #
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
10 #
11 # Unless required by applicable law or agreed to in writing, software
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
15 # limitations under the License.
16
16
17 import paginate
17 import paginate
18 import logging
18 import logging
19 import sqlalchemy as sa
19 import sqlalchemy as sa
20
20
21 from appenlight.models.log import Log
21 from appenlight.models.log import Log
22 from appenlight.models import get_db_session, Datastores
22 from appenlight.models import get_db_session, Datastores
23 from appenlight.models.services.base import BaseService
23 from appenlight.models.services.base import BaseService
24 from appenlight.lib.utils import es_index_name_limiter
24 from appenlight.lib.utils import es_index_name_limiter
25
25
26 log = logging.getLogger(__name__)
26 log = logging.getLogger(__name__)
27
27
28
28
29 class LogService(BaseService):
29 class LogService(BaseService):
30 @classmethod
30 @classmethod
31 def get_logs(cls, resource_ids=None, filter_settings=None, db_session=None):
31 def get_logs(cls, resource_ids=None, filter_settings=None, db_session=None):
32 # ensure we always have id's passed
32 # ensure we always have id's passed
33 if not resource_ids:
33 if not resource_ids:
34 # raise Exception('No App ID passed')
34 # raise Exception('No App ID passed')
35 return []
35 return []
36 db_session = get_db_session(db_session)
36 db_session = get_db_session(db_session)
37 q = db_session.query(Log)
37 q = db_session.query(Log)
38 q = q.filter(Log.resource_id.in_(resource_ids))
38 q = q.filter(Log.resource_id.in_(resource_ids))
39 if filter_settings.get("start_date"):
39 if filter_settings.get("start_date"):
40 q = q.filter(Log.timestamp >= filter_settings.get("start_date"))
40 q = q.filter(Log.timestamp >= filter_settings.get("start_date"))
41 if filter_settings.get("end_date"):
41 if filter_settings.get("end_date"):
42 q = q.filter(Log.timestamp <= filter_settings.get("end_date"))
42 q = q.filter(Log.timestamp <= filter_settings.get("end_date"))
43 if filter_settings.get("log_level"):
43 if filter_settings.get("log_level"):
44 q = q.filter(Log.log_level == filter_settings.get("log_level").upper())
44 q = q.filter(Log.log_level == filter_settings.get("log_level").upper())
45 if filter_settings.get("request_id"):
45 if filter_settings.get("request_id"):
46 request_id = filter_settings.get("request_id", "")
46 request_id = filter_settings.get("request_id", "")
47 q = q.filter(Log.request_id == request_id.replace("-", ""))
47 q = q.filter(Log.request_id == request_id.replace("-", ""))
48 if filter_settings.get("namespace"):
48 if filter_settings.get("namespace"):
49 q = q.filter(Log.namespace == filter_settings.get("namespace"))
49 q = q.filter(Log.namespace == filter_settings.get("namespace"))
50 q = q.order_by(sa.desc(Log.timestamp))
50 q = q.order_by(sa.desc(Log.timestamp))
51 return q
51 return q
52
52
53 @classmethod
53 @classmethod
54 def es_query_builder(cls, app_ids, filter_settings):
54 def es_query_builder(cls, app_ids, filter_settings):
55 if not filter_settings:
55 if not filter_settings:
56 filter_settings = {}
56 filter_settings = {}
57
57
58 query = {
58 query = {
59 "query": {
59 "query": {
60 "bool": {
60 "bool": {
61 "filter": [{"terms": {"resource_id": list(app_ids)}}]
61 "filter": [{"terms": {"resource_id": list(app_ids)}}]
62 }
62 }
63 }
63 }
64 }
64 }
65
65
66 start_date = filter_settings.get("start_date")
66 start_date = filter_settings.get("start_date")
67 end_date = filter_settings.get("end_date")
67 end_date = filter_settings.get("end_date")
68 filter_part = query["query"]["bool"]["filter"]
68 filter_part = query["query"]["bool"]["filter"]
69
69
70 for tag in filter_settings.get("tags", []):
70 for tag in filter_settings.get("tags", []):
71 tag_values = [v.lower() for v in tag["value"]]
71 tag_values = [v.lower() for v in tag["value"]]
72 key = "tags.%s.values" % tag["name"].replace(".", "_")
72 key = "tags.%s.values" % tag["name"].replace(".", "_")
73 filter_part.append({"terms": {key: tag_values}})
73 filter_part.append({"terms": {key: tag_values}})
74
74
75 date_range = {"range": {"timestamp": {}}}
75 date_range = {"range": {"timestamp": {}}}
76 if start_date:
76 if start_date:
77 date_range["range"]["timestamp"]["gte"] = start_date
77 date_range["range"]["timestamp"]["gte"] = start_date
78 if end_date:
78 if end_date:
79 date_range["range"]["timestamp"]["lte"] = end_date
79 date_range["range"]["timestamp"]["lte"] = end_date
80 if start_date or end_date:
80 if start_date or end_date:
81 filter_part.append(date_range)
81 filter_part.append(date_range)
82
82
83 levels = filter_settings.get("level")
83 levels = filter_settings.get("level")
84 if levels:
84 if levels:
85 filter_part.append({"terms": {"log_level": levels}})
85 filter_part.append({"terms": {"log_level": levels}})
86 namespaces = filter_settings.get("namespace")
86 namespaces = filter_settings.get("namespace")
87 if namespaces:
87 if namespaces:
88 filter_part.append({"terms": {"namespace": namespaces}})
88 filter_part.append({"terms": {"namespace": namespaces}})
89
89
90 request_ids = filter_settings.get("request_id")
90 request_ids = filter_settings.get("request_id")
91 if request_ids:
91 if request_ids:
92 filter_part.append({"terms": {"request_id": request_ids}})
92 filter_part.append({"terms": {"request_id": request_ids}})
93
93
94 messages = filter_settings.get("message")
94 messages = filter_settings.get("message")
95 if messages:
95 if messages:
96 query["query"]["bool"]["must"] = {
96 query["query"]["bool"]["must"] = {
97 "match": {"message": {"query": " ".join(messages), "operator": "and"}}
97 "match": {"message": {"query": " ".join(messages), "operator": "and"}}
98 }
98 }
99 return query
99 return query
100
100
101 @classmethod
101 @classmethod
102 def get_time_series_aggregate(cls, app_ids=None, filter_settings=None):
102 def get_time_series_aggregate(cls, app_ids=None, filter_settings=None):
103 if not app_ids:
103 if not app_ids:
104 return {}
104 return {}
105 es_query = cls.es_query_builder(app_ids, filter_settings)
105 es_query = cls.es_query_builder(app_ids, filter_settings)
106 es_query["aggs"] = {
106 es_query["aggs"] = {
107 "events_over_time": {
107 "events_over_time": {
108 "date_histogram": {
108 "date_histogram": {
109 "field": "timestamp",
109 "field": "timestamp",
110 "interval": "1h",
110 "interval": "1h",
111 "min_doc_count": 0,
111 "min_doc_count": 0,
112 "extended_bounds": {
112 "extended_bounds": {
113 "max": filter_settings.get("end_date"),
113 "max": filter_settings.get("end_date"),
114 "min": filter_settings.get("start_date"),
114 "min": filter_settings.get("start_date"),
115 },
115 },
116 }
116 }
117 }
117 }
118 }
118 }
119 log.debug(es_query)
119 log.debug(es_query)
120 index_names = es_index_name_limiter(
120 index_names = es_index_name_limiter(
121 filter_settings.get("start_date"),
121 filter_settings.get("start_date"),
122 filter_settings.get("end_date"),
122 filter_settings.get("end_date"),
123 ixtypes=["logs"],
123 ixtypes=["logs"],
124 )
124 )
125 if index_names:
125 if index_names:
126 results = Datastores.es.search(
126 results = Datastores.es.search(
127 body=es_query, index=index_names, doc_type="log", size=0
127 body=es_query, index=index_names, doc_type="log", size=0
128 )
128 )
129 else:
129 else:
130 results = []
130 results = []
131 return results
131 return results
132
132
133 @classmethod
133 @classmethod
134 def get_search_iterator(
134 def get_search_iterator(
135 cls,
135 cls,
136 app_ids=None,
136 app_ids=None,
137 page=1,
137 page=1,
138 items_per_page=50,
138 items_per_page=50,
139 order_by=None,
139 order_by=None,
140 filter_settings=None,
140 filter_settings=None,
141 limit=None,
141 limit=None,
142 ):
142 ):
143 if not app_ids:
143 if not app_ids:
144 return {}, 0
144 return {}, 0
145
145
146 es_query = cls.es_query_builder(app_ids, filter_settings)
146 es_query = cls.es_query_builder(app_ids, filter_settings)
147 sort_query = {"sort": [{"timestamp": {"order": "desc"}}]}
147 sort_query = {"sort": [{"timestamp": {"order": "desc"}}]}
148 es_query.update(sort_query)
148 es_query.update(sort_query)
149 log.debug(es_query)
149 log.debug(es_query)
150 es_from = (page - 1) * items_per_page
150 es_from = (page - 1) * items_per_page
151 index_names = es_index_name_limiter(
151 index_names = es_index_name_limiter(
152 filter_settings.get("start_date"),
152 filter_settings.get("start_date"),
153 filter_settings.get("end_date"),
153 filter_settings.get("end_date"),
154 ixtypes=["logs"],
154 ixtypes=["logs"],
155 )
155 )
156 if not index_names:
156 if not index_names:
157 return {}, 0
157 return {}, 0
158
158
159 results = Datastores.es.search(
159 results = Datastores.es.search(
160 body=es_query,
160 body=es_query,
161 index=index_names,
161 index=index_names,
162 doc_type="log",
162 doc_type="log",
163 size=items_per_page,
163 size=items_per_page,
164 from_=es_from,
164 from_=es_from,
165 )
165 )
166 if results["hits"]["total"] > 5000:
166 if results["hits"]["total"] > 5000:
167 count = 5000
167 count = 5000
168 else:
168 else:
169 count = results["hits"]["total"]
169 count = results["hits"]["total"]
170 return results["hits"], count
170 return results["hits"], count
171
171
172 @classmethod
172 @classmethod
173 def get_paginator_by_app_ids(
173 def get_paginator_by_app_ids(
174 cls,
174 cls,
175 app_ids=None,
175 app_ids=None,
176 page=1,
176 page=1,
177 item_count=None,
177 item_count=None,
178 items_per_page=50,
178 items_per_page=50,
179 order_by=None,
179 order_by=None,
180 filter_settings=None,
180 filter_settings=None,
181 exclude_columns=None,
181 exclude_columns=None,
182 db_session=None,
182 db_session=None,
183 ):
183 ):
184 if not filter_settings:
184 if not filter_settings:
185 filter_settings = {}
185 filter_settings = {}
186 results, item_count = cls.get_search_iterator(
186 results, item_count = cls.get_search_iterator(
187 app_ids, page, items_per_page, order_by, filter_settings
187 app_ids, page, items_per_page, order_by, filter_settings
188 )
188 )
189 paginator = paginate.Page(
189 paginator = paginate.Page(
190 [], item_count=item_count, items_per_page=items_per_page, **filter_settings
190 [], item_count=item_count, items_per_page=items_per_page, **filter_settings
191 )
191 )
192 ordered_ids = tuple(
192 ordered_ids = tuple(
193 item["_source"]["pg_id"] for item in results.get("hits", [])
193 item["_source"]["log_id"] for item in results.get("hits", [])
194 )
194 )
195
195
196 sorted_instance_list = []
196 sorted_instance_list = []
197 if ordered_ids:
197 if ordered_ids:
198 db_session = get_db_session(db_session)
198 db_session = get_db_session(db_session)
199 query = db_session.query(Log)
199 query = db_session.query(Log)
200 query = query.filter(Log.log_id.in_(ordered_ids))
200 query = query.filter(Log.log_id.in_(ordered_ids))
201 query = query.order_by(sa.desc("timestamp"))
201 query = query.order_by(sa.desc("timestamp"))
202 sa_items = query.all()
202 sa_items = query.all()
203 # resort by score
203 # resort by score
204 for i_id in ordered_ids:
204 for i_id in ordered_ids:
205 for item in sa_items:
205 for item in sa_items:
206 if str(item.log_id) == str(i_id):
206 if str(item.log_id) == str(i_id):
207 sorted_instance_list.append(item)
207 sorted_instance_list.append(item)
208 paginator.sa_items = sorted_instance_list
208 paginator.sa_items = sorted_instance_list
209 return paginator
209 return paginator
210
210
211 @classmethod
211 @classmethod
212 def query_by_primary_key_and_namespace(cls, list_of_pairs, db_session=None):
212 def query_by_primary_key_and_namespace(cls, list_of_pairs, db_session=None):
213 db_session = get_db_session(db_session)
213 db_session = get_db_session(db_session)
214 list_of_conditions = []
214 list_of_conditions = []
215 query = db_session.query(Log)
215 query = db_session.query(Log)
216 for pair in list_of_pairs:
216 for pair in list_of_pairs:
217 list_of_conditions.append(
217 list_of_conditions.append(
218 sa.and_(Log.primary_key == pair["pk"], Log.namespace == pair["ns"])
218 sa.and_(Log.primary_key == pair["pk"], Log.namespace == pair["ns"])
219 )
219 )
220 query = query.filter(sa.or_(*list_of_conditions))
220 query = query.filter(sa.or_(*list_of_conditions))
221 query = query.order_by(sa.asc(Log.timestamp), sa.asc(Log.log_id))
221 query = query.order_by(sa.asc(Log.timestamp), sa.asc(Log.log_id))
222 return query
222 return query
@@ -1,521 +1,521 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors
3 # Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors
4 #
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
7 # You may obtain a copy of the License at
8 #
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
10 #
11 # Unless required by applicable law or agreed to in writing, software
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
15 # limitations under the License.
16
16
17 import logging
17 import logging
18 import paginate
18 import paginate
19 import sqlalchemy as sa
19 import sqlalchemy as sa
20 import appenlight.lib.helpers as h
20 import appenlight.lib.helpers as h
21
21
22 from datetime import datetime
22 from datetime import datetime
23
23
24 from appenlight.models import get_db_session, Datastores
24 from appenlight.models import get_db_session, Datastores
25 from appenlight.models.report import Report
25 from appenlight.models.report import Report
26 from appenlight.models.report_group import ReportGroup
26 from appenlight.models.report_group import ReportGroup
27 from appenlight.models.report_comment import ReportComment
27 from appenlight.models.report_comment import ReportComment
28 from appenlight.models.user import User
28 from appenlight.models.user import User
29 from appenlight.models.services.base import BaseService
29 from appenlight.models.services.base import BaseService
30 from appenlight.lib.enums import ReportType
30 from appenlight.lib.enums import ReportType
31 from appenlight.lib.utils import es_index_name_limiter
31 from appenlight.lib.utils import es_index_name_limiter
32
32
33 log = logging.getLogger(__name__)
33 log = logging.getLogger(__name__)
34
34
35
35
36 class ReportGroupService(BaseService):
36 class ReportGroupService(BaseService):
37 @classmethod
37 @classmethod
38 def get_trending(cls, request, filter_settings, limit=15, db_session=None):
38 def get_trending(cls, request, filter_settings, limit=15, db_session=None):
39 """
39 """
40 Returns report groups trending for specific time interval
40 Returns report groups trending for specific time interval
41 """
41 """
42 db_session = get_db_session(db_session)
42 db_session = get_db_session(db_session)
43
43
44 tags = []
44 tags = []
45 if filter_settings.get("tags"):
45 if filter_settings.get("tags"):
46 for tag in filter_settings["tags"]:
46 for tag in filter_settings["tags"]:
47 tags.append(
47 tags.append(
48 {"terms": {"tags.{}.values".format(tag["name"]): tag["value"]}}
48 {"terms": {"tags.{}.values".format(tag["name"]): tag["value"]}}
49 )
49 )
50
50
51 index_names = es_index_name_limiter(
51 index_names = es_index_name_limiter(
52 start_date=filter_settings["start_date"],
52 start_date=filter_settings["start_date"],
53 end_date=filter_settings["end_date"],
53 end_date=filter_settings["end_date"],
54 ixtypes=["reports"],
54 ixtypes=["reports"],
55 )
55 )
56
56
57 if not index_names or not filter_settings["resource"]:
57 if not index_names or not filter_settings["resource"]:
58 return []
58 return []
59
59
60 es_query = {
60 es_query = {
61 "aggs": {
61 "aggs": {
62 "parent_agg": {
62 "parent_agg": {
63 "aggs": {
63 "aggs": {
64 "groups": {
64 "groups": {
65 "aggs": {
65 "aggs": {
66 "sub_agg": {
66 "sub_agg": {
67 "value_count": {"field": "tags.group_id.values.keyword"}
67 "value_count": {"field": "tags.group_id.values.keyword"}
68 }
68 }
69 },
69 },
70 "filter": {"exists": {"field": "tags.group_id.values"}},
70 "filter": {"exists": {"field": "tags.group_id.values"}},
71 }
71 }
72 },
72 },
73 "terms": {"field": "tags.group_id.values.keyword", "size": limit},
73 "terms": {"field": "tags.group_id.values.keyword", "size": limit},
74 }
74 }
75 },
75 },
76 "query": {
76 "query": {
77 "bool": {
77 "bool": {
78 "filter": [
78 "filter": [
79 {
79 {
80 "terms": {
80 "terms": {
81 "resource_id": [filter_settings["resource"][0]]
81 "resource_id": [filter_settings["resource"][0]]
82 }
82 }
83 },
83 },
84 {
84 {
85 "range": {
85 "range": {
86 "timestamp": {
86 "timestamp": {
87 "gte": filter_settings["start_date"],
87 "gte": filter_settings["start_date"],
88 "lte": filter_settings["end_date"],
88 "lte": filter_settings["end_date"],
89 }
89 }
90 }
90 }
91 },
91 },
92 ]
92 ]
93 }
93 }
94 },
94 },
95 }
95 }
96 if tags:
96 if tags:
97 es_query["query"]["bool"]["filter"].extend(tags)
97 es_query["query"]["bool"]["filter"].extend(tags)
98
98
99 result = Datastores.es.search(
99 result = Datastores.es.search(
100 body=es_query, index=index_names, doc_type="log", size=0
100 body=es_query, index=index_names, doc_type="report", size=0
101 )
101 )
102 series = []
102 series = []
103 for bucket in result["aggregations"]["parent_agg"]["buckets"]:
103 for bucket in result["aggregations"]["parent_agg"]["buckets"]:
104 series.append(
104 series.append(
105 {"key": bucket["key"], "groups": bucket["groups"]["sub_agg"]["value"]}
105 {"key": bucket["key"], "groups": bucket["groups"]["sub_agg"]["value"]}
106 )
106 )
107
107
108 report_groups_d = {}
108 report_groups_d = {}
109 for g in series:
109 for g in series:
110 report_groups_d[int(g["key"])] = g["groups"] or 0
110 report_groups_d[int(g["key"])] = g["groups"] or 0
111
111
112 query = db_session.query(ReportGroup)
112 query = db_session.query(ReportGroup)
113 query = query.filter(ReportGroup.id.in_(list(report_groups_d.keys())))
113 query = query.filter(ReportGroup.id.in_(list(report_groups_d.keys())))
114 query = query.options(sa.orm.joinedload(ReportGroup.last_report_ref))
114 query = query.options(sa.orm.joinedload(ReportGroup.last_report_ref))
115 results = [(report_groups_d[group.id], group) for group in query]
115 results = [(report_groups_d[group.id], group) for group in query]
116 return sorted(results, reverse=True, key=lambda x: x[0])
116 return sorted(results, reverse=True, key=lambda x: x[0])
117
117
118 @classmethod
118 @classmethod
119 def get_search_iterator(
119 def get_search_iterator(
120 cls,
120 cls,
121 app_ids=None,
121 app_ids=None,
122 page=1,
122 page=1,
123 items_per_page=50,
123 items_per_page=50,
124 order_by=None,
124 order_by=None,
125 filter_settings=None,
125 filter_settings=None,
126 limit=None,
126 limit=None,
127 ):
127 ):
128 if not app_ids:
128 if not app_ids:
129 return {}
129 return {}
130 if not filter_settings:
130 if not filter_settings:
131 filter_settings = {}
131 filter_settings = {}
132
132
133 query = {
133 query = {
134 "size": 0,
134 "size": 0,
135 "query": {
135 "query": {
136 "bool": {
136 "bool": {
137 "must": [],
137 "must": [],
138 "should": [],
138 "should": [],
139 "filter": [{"terms": {"resource_id": list(app_ids)}}]
139 "filter": [{"terms": {"resource_id": list(app_ids)}}]
140 }
140 }
141 },
141 },
142 "aggs": {
142 "aggs": {
143 "top_groups": {
143 "top_groups": {
144 "terms": {
144 "terms": {
145 "size": 5000,
145 "size": 5000,
146 "field": "_parent#report_group",
146 "field": "join_field#report_group",
147 "order": {"newest": "desc"},
147 "order": {"newest": "desc"},
148 },
148 },
149 "aggs": {
149 "aggs": {
150 "top_reports_hits": {
150 "top_reports_hits": {
151 "top_hits": {"size": 1, "sort": {"start_time": "desc"}}
151 "top_hits": {"size": 1, "sort": {"start_time": "desc"}}
152 },
152 },
153 "newest": {"max": {"field": "start_time"}},
153 "newest": {"max": {"field": "start_time"}},
154 },
154 },
155 }
155 }
156 },
156 },
157 }
157 }
158
158
159 start_date = filter_settings.get("start_date")
159 start_date = filter_settings.get("start_date")
160 end_date = filter_settings.get("end_date")
160 end_date = filter_settings.get("end_date")
161 filter_part = query["query"]["bool"]["filter"]
161 filter_part = query["query"]["bool"]["filter"]
162 date_range = {"range": {"start_time": {}}}
162 date_range = {"range": {"start_time": {}}}
163 if start_date:
163 if start_date:
164 date_range["range"]["start_time"]["gte"] = start_date
164 date_range["range"]["start_time"]["gte"] = start_date
165 if end_date:
165 if end_date:
166 date_range["range"]["start_time"]["lte"] = end_date
166 date_range["range"]["start_time"]["lte"] = end_date
167 if start_date or end_date:
167 if start_date or end_date:
168 filter_part.append(date_range)
168 filter_part.append(date_range)
169
169
170 priorities = filter_settings.get("priority")
170 priorities = filter_settings.get("priority")
171
171
172 for tag in filter_settings.get("tags", []):
172 for tag in filter_settings.get("tags", []):
173 tag_values = [v.lower() for v in tag["value"]]
173 tag_values = [v.lower() for v in tag["value"]]
174 key = "tags.%s.values" % tag["name"].replace(".", "_")
174 key = "tags.%s.values" % tag["name"].replace(".", "_")
175 filter_part.append({"terms": {key: tag_values}})
175 filter_part.append({"terms": {key: tag_values}})
176
176
177 if priorities:
177 if priorities:
178 filter_part.append(
178 filter_part.append(
179 {
179 {
180 "has_parent": {
180 "has_parent": {
181 "parent_type": "report_group",
181 "parent_type": "report_group",
182 "query": {"terms": {"priority": priorities}},
182 "query": {"terms": {"priority": priorities}},
183 }
183 }
184 }
184 }
185 )
185 )
186
186
187 min_occurences = filter_settings.get("min_occurences")
187 min_occurences = filter_settings.get("min_occurences")
188 if min_occurences:
188 if min_occurences:
189 filter_part.append(
189 filter_part.append(
190 {
190 {
191 "has_parent": {
191 "has_parent": {
192 "parent_type": "report_group",
192 "parent_type": "report_group",
193 "query": {"range": {"occurences": {"gte": min_occurences[0]}}},
193 "query": {"range": {"occurences": {"gte": min_occurences[0]}}},
194 }
194 }
195 }
195 }
196 )
196 )
197
197
198 min_duration = filter_settings.get("min_duration")
198 min_duration = filter_settings.get("min_duration")
199 max_duration = filter_settings.get("max_duration")
199 max_duration = filter_settings.get("max_duration")
200
200
201 request_ids = filter_settings.get("request_id")
201 request_ids = filter_settings.get("request_id")
202 if request_ids:
202 if request_ids:
203 filter_part.append({"terms": {"request_id": request_ids}})
203 filter_part.append({"terms": {"request_id": request_ids}})
204
204
205 duration_range = {"range": {"average_duration": {}}}
205 duration_range = {"range": {"average_duration": {}}}
206 if min_duration:
206 if min_duration:
207 duration_range["range"]["average_duration"]["gte"] = min_duration[0]
207 duration_range["range"]["average_duration"]["gte"] = min_duration[0]
208 if max_duration:
208 if max_duration:
209 duration_range["range"]["average_duration"]["lte"] = max_duration[0]
209 duration_range["range"]["average_duration"]["lte"] = max_duration[0]
210 if min_duration or max_duration:
210 if min_duration or max_duration:
211 filter_part.append(
211 filter_part.append(
212 {"has_parent": {"parent_type": "report_group", "query": duration_range}}
212 {"has_parent": {"parent_type": "report_group", "query": duration_range}}
213 )
213 )
214
214
215 http_status = filter_settings.get("http_status")
215 http_status = filter_settings.get("http_status")
216 report_type = filter_settings.get("report_type", [ReportType.error])
216 report_type = filter_settings.get("report_type", [ReportType.error])
217 # set error report type if http status is not found
217 # set error report type if http status is not found
218 # and we are dealing with slow reports
218 # and we are dealing with slow reports
219 if not http_status or ReportType.slow in report_type:
219 if not http_status or ReportType.slow in report_type:
220 filter_part.append({"terms": {"report_type": report_type}})
220 filter_part.append({"terms": {"report_type": report_type}})
221 if http_status:
221 if http_status:
222 filter_part.append({"terms": {"http_status": http_status}})
222 filter_part.append({"terms": {"http_status": http_status}})
223
223
224 messages = filter_settings.get("message")
224 messages = filter_settings.get("message")
225 if messages:
225 if messages:
226 condition = {"match": {"message": " ".join(messages)}}
226 condition = {"match": {"message": " ".join(messages)}}
227 query["query"]["bool"]["must"].append(condition)
227 query["query"]["bool"]["must"].append(condition)
228 errors = filter_settings.get("error")
228 errors = filter_settings.get("error")
229 if errors:
229 if errors:
230 condition = {"match": {"error": " ".join(errors)}}
230 condition = {"match": {"error": " ".join(errors)}}
231 query["query"]["bool"]["must"].append(condition)
231 query["query"]["bool"]["must"].append(condition)
232 url_domains = filter_settings.get("url_domain")
232 url_domains = filter_settings.get("url_domain")
233 if url_domains:
233 if url_domains:
234 condition = {"terms": {"url_domain": url_domains}}
234 condition = {"terms": {"url_domain": url_domains}}
235 query["query"]["bool"]["must"].append(condition)
235 query["query"]["bool"]["must"].append(condition)
236 url_paths = filter_settings.get("url_path")
236 url_paths = filter_settings.get("url_path")
237 if url_paths:
237 if url_paths:
238 condition = {"terms": {"url_path": url_paths}}
238 condition = {"terms": {"url_path": url_paths}}
239 query["query"]["bool"]["must"].append(condition)
239 query["query"]["bool"]["must"].append(condition)
240
240
241 if filter_settings.get("report_status"):
241 if filter_settings.get("report_status"):
242 for status in filter_settings.get("report_status"):
242 for status in filter_settings.get("report_status"):
243 if status == "never_reviewed":
243 if status == "never_reviewed":
244 filter_part.append(
244 filter_part.append(
245 {
245 {
246 "has_parent": {
246 "has_parent": {
247 "parent_type": "report_group",
247 "parent_type": "report_group",
248 "query": {"term": {"read": False}},
248 "query": {"term": {"read": False}},
249 }
249 }
250 }
250 }
251 )
251 )
252 elif status == "reviewed":
252 elif status == "reviewed":
253 filter_part.append(
253 filter_part.append(
254 {
254 {
255 "has_parent": {
255 "has_parent": {
256 "parent_type": "report_group",
256 "parent_type": "report_group",
257 "query": {"term": {"read": True}},
257 "query": {"term": {"read": True}},
258 }
258 }
259 }
259 }
260 )
260 )
261 elif status == "public":
261 elif status == "public":
262 filter_part.append(
262 filter_part.append(
263 {
263 {
264 "has_parent": {
264 "has_parent": {
265 "parent_type": "report_group",
265 "parent_type": "report_group",
266 "query": {"term": {"public": True}},
266 "query": {"term": {"public": True}},
267 }
267 }
268 }
268 }
269 )
269 )
270 elif status == "fixed":
270 elif status == "fixed":
271 filter_part.append(
271 filter_part.append(
272 {
272 {
273 "has_parent": {
273 "has_parent": {
274 "parent_type": "report_group",
274 "parent_type": "report_group",
275 "query": {"term": {"fixed": True}},
275 "query": {"term": {"fixed": True}},
276 }
276 }
277 }
277 }
278 )
278 )
279
279
280 # logging.getLogger('pyelasticsearch').setLevel(logging.DEBUG)
280 # logging.getLogger('pyelasticsearch').setLevel(logging.DEBUG)
281 index_names = es_index_name_limiter(
281 index_names = es_index_name_limiter(
282 filter_settings.get("start_date"),
282 filter_settings.get("start_date"),
283 filter_settings.get("end_date"),
283 filter_settings.get("end_date"),
284 ixtypes=["reports"],
284 ixtypes=["reports"],
285 )
285 )
286 if index_names:
286 if index_names:
287 results = Datastores.es.search(
287 results = Datastores.es.search(
288 body=query,
288 body=query,
289 index=index_names,
289 index=index_names,
290 doc_type=["report", "report_group"],
290 doc_type=["report", "report_group"],
291 size=0,
291 size=0,
292 )
292 )
293 else:
293 else:
294 return []
294 return []
295 return results["aggregations"]
295 return results["aggregations"]
296
296
297 @classmethod
297 @classmethod
298 def get_paginator_by_app_ids(
298 def get_paginator_by_app_ids(
299 cls,
299 cls,
300 app_ids=None,
300 app_ids=None,
301 page=1,
301 page=1,
302 item_count=None,
302 item_count=None,
303 items_per_page=50,
303 items_per_page=50,
304 order_by=None,
304 order_by=None,
305 filter_settings=None,
305 filter_settings=None,
306 exclude_columns=None,
306 exclude_columns=None,
307 db_session=None,
307 db_session=None,
308 ):
308 ):
309 if not filter_settings:
309 if not filter_settings:
310 filter_settings = {}
310 filter_settings = {}
311 results = cls.get_search_iterator(
311 results = cls.get_search_iterator(
312 app_ids, page, items_per_page, order_by, filter_settings
312 app_ids, page, items_per_page, order_by, filter_settings
313 )
313 )
314
314
315 ordered_ids = []
315 ordered_ids = []
316 if results:
316 if results:
317 for item in results["top_groups"]["buckets"]:
317 for item in results["top_groups"]["buckets"]:
318 pg_id = item["top_reports_hits"]["hits"]["hits"][0]["_source"]["pg_id"]
318 pg_id = item["top_reports_hits"]["hits"]["hits"][0]["_source"]["report_id"]
319 ordered_ids.append(pg_id)
319 ordered_ids.append(pg_id)
320 log.info(filter_settings)
320 log.info(filter_settings)
321 paginator = paginate.Page(
321 paginator = paginate.Page(
322 ordered_ids, items_per_page=items_per_page, **filter_settings
322 ordered_ids, items_per_page=items_per_page, **filter_settings
323 )
323 )
324 sa_items = ()
324 sa_items = ()
325 if paginator.items:
325 if paginator.items:
326 db_session = get_db_session(db_session)
326 db_session = get_db_session(db_session)
327 # latest report detail
327 # latest report detail
328 query = db_session.query(Report)
328 query = db_session.query(Report)
329 query = query.options(sa.orm.joinedload(Report.report_group))
329 query = query.options(sa.orm.joinedload(Report.report_group))
330 query = query.filter(Report.id.in_(paginator.items))
330 query = query.filter(Report.id.in_(paginator.items))
331 if filter_settings.get("order_col"):
331 if filter_settings.get("order_col"):
332 order_col = filter_settings.get("order_col")
332 order_col = filter_settings.get("order_col")
333 if filter_settings.get("order_dir") == "dsc":
333 if filter_settings.get("order_dir") == "dsc":
334 sort_on = "desc"
334 sort_on = "desc"
335 else:
335 else:
336 sort_on = "asc"
336 sort_on = "asc"
337 if order_col == "when":
337 if order_col == "when":
338 order_col = "last_timestamp"
338 order_col = "last_timestamp"
339 query = query.order_by(
339 query = query.order_by(
340 getattr(sa, sort_on)(getattr(ReportGroup, order_col))
340 getattr(sa, sort_on)(getattr(ReportGroup, order_col))
341 )
341 )
342 sa_items = query.all()
342 sa_items = query.all()
343 sorted_instance_list = []
343 sorted_instance_list = []
344 for i_id in ordered_ids:
344 for i_id in ordered_ids:
345 for report in sa_items:
345 for report in sa_items:
346 if str(report.id) == i_id and report not in sorted_instance_list:
346 if str(report.id) == i_id and report not in sorted_instance_list:
347 sorted_instance_list.append(report)
347 sorted_instance_list.append(report)
348 paginator.sa_items = sorted_instance_list
348 paginator.sa_items = sorted_instance_list
349 return paginator
349 return paginator
350
350
351 @classmethod
351 @classmethod
352 def by_app_ids(cls, app_ids=None, order_by=True, db_session=None):
352 def by_app_ids(cls, app_ids=None, order_by=True, db_session=None):
353 db_session = get_db_session(db_session)
353 db_session = get_db_session(db_session)
354 q = db_session.query(ReportGroup)
354 q = db_session.query(ReportGroup)
355 if app_ids:
355 if app_ids:
356 q = q.filter(ReportGroup.resource_id.in_(app_ids))
356 q = q.filter(ReportGroup.resource_id.in_(app_ids))
357 if order_by:
357 if order_by:
358 q = q.order_by(sa.desc(ReportGroup.id))
358 q = q.order_by(sa.desc(ReportGroup.id))
359 return q
359 return q
360
360
361 @classmethod
361 @classmethod
362 def by_id(cls, group_id, app_ids=None, db_session=None):
362 def by_id(cls, group_id, app_ids=None, db_session=None):
363 db_session = get_db_session(db_session)
363 db_session = get_db_session(db_session)
364 q = db_session.query(ReportGroup).filter(ReportGroup.id == int(group_id))
364 q = db_session.query(ReportGroup).filter(ReportGroup.id == int(group_id))
365 if app_ids:
365 if app_ids:
366 q = q.filter(ReportGroup.resource_id.in_(app_ids))
366 q = q.filter(ReportGroup.resource_id.in_(app_ids))
367 return q.first()
367 return q.first()
368
368
369 @classmethod
369 @classmethod
370 def by_ids(cls, group_ids=None, db_session=None):
370 def by_ids(cls, group_ids=None, db_session=None):
371 db_session = get_db_session(db_session)
371 db_session = get_db_session(db_session)
372 query = db_session.query(ReportGroup)
372 query = db_session.query(ReportGroup)
373 query = query.filter(ReportGroup.id.in_(group_ids))
373 query = query.filter(ReportGroup.id.in_(group_ids))
374 return query
374 return query
375
375
376 @classmethod
376 @classmethod
377 def by_hash_and_resource(
377 def by_hash_and_resource(
378 cls, resource_id, grouping_hash, since_when=None, db_session=None
378 cls, resource_id, grouping_hash, since_when=None, db_session=None
379 ):
379 ):
380 db_session = get_db_session(db_session)
380 db_session = get_db_session(db_session)
381 q = db_session.query(ReportGroup)
381 q = db_session.query(ReportGroup)
382 q = q.filter(ReportGroup.resource_id == resource_id)
382 q = q.filter(ReportGroup.resource_id == resource_id)
383 q = q.filter(ReportGroup.grouping_hash == grouping_hash)
383 q = q.filter(ReportGroup.grouping_hash == grouping_hash)
384 q = q.filter(ReportGroup.fixed == False)
384 q = q.filter(ReportGroup.fixed == False)
385 if since_when:
385 if since_when:
386 q = q.filter(ReportGroup.first_timestamp >= since_when)
386 q = q.filter(ReportGroup.first_timestamp >= since_when)
387 return q.first()
387 return q.first()
388
388
389 @classmethod
389 @classmethod
390 def users_commenting(cls, report_group, exclude_user_id=None, db_session=None):
390 def users_commenting(cls, report_group, exclude_user_id=None, db_session=None):
391 db_session = get_db_session(None, report_group)
391 db_session = get_db_session(None, report_group)
392 query = db_session.query(User).distinct()
392 query = db_session.query(User).distinct()
393 query = query.filter(User.id == ReportComment.owner_id)
393 query = query.filter(User.id == ReportComment.owner_id)
394 query = query.filter(ReportComment.group_id == report_group.id)
394 query = query.filter(ReportComment.group_id == report_group.id)
395 if exclude_user_id:
395 if exclude_user_id:
396 query = query.filter(ReportComment.owner_id != exclude_user_id)
396 query = query.filter(ReportComment.owner_id != exclude_user_id)
397 return query
397 return query
398
398
399 @classmethod
399 @classmethod
400 def affected_users_count(cls, report_group, db_session=None):
400 def affected_users_count(cls, report_group, db_session=None):
401 db_session = get_db_session(db_session)
401 db_session = get_db_session(db_session)
402 query = db_session.query(sa.func.count(Report.username))
402 query = db_session.query(sa.func.count(Report.username))
403 query = query.filter(Report.group_id == report_group.id)
403 query = query.filter(Report.group_id == report_group.id)
404 query = query.filter(Report.username != "")
404 query = query.filter(Report.username != "")
405 query = query.filter(Report.username != None)
405 query = query.filter(Report.username != None)
406 query = query.group_by(Report.username)
406 query = query.group_by(Report.username)
407 return query.count()
407 return query.count()
408
408
409 @classmethod
409 @classmethod
410 def top_affected_users(cls, report_group, db_session=None):
410 def top_affected_users(cls, report_group, db_session=None):
411 db_session = get_db_session(db_session)
411 db_session = get_db_session(db_session)
412 count_label = sa.func.count(Report.username).label("count")
412 count_label = sa.func.count(Report.username).label("count")
413 query = db_session.query(Report.username, count_label)
413 query = db_session.query(Report.username, count_label)
414 query = query.filter(Report.group_id == report_group.id)
414 query = query.filter(Report.group_id == report_group.id)
415 query = query.filter(Report.username != None)
415 query = query.filter(Report.username != None)
416 query = query.filter(Report.username != "")
416 query = query.filter(Report.username != "")
417 query = query.group_by(Report.username)
417 query = query.group_by(Report.username)
418 query = query.order_by(sa.desc(count_label))
418 query = query.order_by(sa.desc(count_label))
419 query = query.limit(50)
419 query = query.limit(50)
420 return query
420 return query
421
421
422 @classmethod
422 @classmethod
423 def get_report_stats(cls, request, filter_settings):
423 def get_report_stats(cls, request, filter_settings):
424 """
424 """
425 Gets report dashboard graphs
425 Gets report dashboard graphs
426 Returns information for BAR charts with occurences/interval information
426 Returns information for BAR charts with occurences/interval information
427 detailed means version that returns time intervals - non detailed
427 detailed means version that returns time intervals - non detailed
428 returns total sum
428 returns total sum
429 """
429 """
430 delta = filter_settings["end_date"] - filter_settings["start_date"]
430 delta = filter_settings["end_date"] - filter_settings["start_date"]
431 if delta < h.time_deltas.get("12h")["delta"]:
431 if delta < h.time_deltas.get("12h")["delta"]:
432 interval = "1m"
432 interval = "1m"
433 elif delta <= h.time_deltas.get("3d")["delta"]:
433 elif delta <= h.time_deltas.get("3d")["delta"]:
434 interval = "5m"
434 interval = "5m"
435 elif delta >= h.time_deltas.get("2w")["delta"]:
435 elif delta >= h.time_deltas.get("2w")["delta"]:
436 interval = "24h"
436 interval = "24h"
437 else:
437 else:
438 interval = "1h"
438 interval = "1h"
439
439
440 group_id = filter_settings.get("group_id")
440 group_id = filter_settings.get("group_id")
441
441
442 es_query = {
442 es_query = {
443 "aggs": {
443 "aggs": {
444 "parent_agg": {
444 "parent_agg": {
445 "aggs": {
445 "aggs": {
446 "types": {
446 "types": {
447 "aggs": {
447 "aggs": {
448 "sub_agg": {"terms": {"field": "tags.type.values.keyword"}}
448 "sub_agg": {"terms": {"field": "tags.type.values.keyword"}}
449 },
449 },
450 "filter": {
450 "filter": {
451 "bool": {
451 "bool": {
452 "filter": [{"exists": {"field": "tags.type.values"}}]
452 "filter": [{"exists": {"field": "tags.type.values"}}]
453 }
453 }
454 },
454 },
455 }
455 }
456 },
456 },
457 "date_histogram": {
457 "date_histogram": {
458 "extended_bounds": {
458 "extended_bounds": {
459 "max": filter_settings["end_date"],
459 "max": filter_settings["end_date"],
460 "min": filter_settings["start_date"],
460 "min": filter_settings["start_date"],
461 },
461 },
462 "field": "timestamp",
462 "field": "timestamp",
463 "interval": interval,
463 "interval": interval,
464 "min_doc_count": 0,
464 "min_doc_count": 0,
465 },
465 },
466 }
466 }
467 },
467 },
468 "query": {
468 "query": {
469 "bool": {
469 "bool": {
470 "filter": [
470 "filter": [
471 {
471 {
472 "terms": {
472 "terms": {
473 "resource_id": [filter_settings["resource"][0]]
473 "resource_id": [filter_settings["resource"][0]]
474 }
474 }
475 },
475 },
476 {
476 {
477 "range": {
477 "range": {
478 "timestamp": {
478 "timestamp": {
479 "gte": filter_settings["start_date"],
479 "gte": filter_settings["start_date"],
480 "lte": filter_settings["end_date"],
480 "lte": filter_settings["end_date"],
481 }
481 }
482 }
482 }
483 },
483 },
484 ]
484 ]
485 }
485 }
486 },
486 },
487 }
487 }
488 if group_id:
488 if group_id:
489 parent_agg = es_query["aggs"]["parent_agg"]
489 parent_agg = es_query["aggs"]["parent_agg"]
490 filters = parent_agg["aggs"]["types"]["filter"]["bool"]["filter"]
490 filters = parent_agg["aggs"]["types"]["filter"]["bool"]["filter"]
491 filters.append({"terms": {"tags.group_id.values": [group_id]}})
491 filters.append({"terms": {"tags.group_id.values": [group_id]}})
492
492
493 index_names = es_index_name_limiter(
493 index_names = es_index_name_limiter(
494 start_date=filter_settings["start_date"],
494 start_date=filter_settings["start_date"],
495 end_date=filter_settings["end_date"],
495 end_date=filter_settings["end_date"],
496 ixtypes=["reports"],
496 ixtypes=["reports"],
497 )
497 )
498
498
499 if not index_names:
499 if not index_names:
500 return []
500 return []
501
501
502 result = Datastores.es.search(
502 result = Datastores.es.search(
503 body=es_query, index=index_names, doc_type="log", size=0
503 body=es_query, index=index_names, doc_type="log", size=0
504 )
504 )
505 series = []
505 series = []
506 for bucket in result["aggregations"]["parent_agg"]["buckets"]:
506 for bucket in result["aggregations"]["parent_agg"]["buckets"]:
507 point = {
507 point = {
508 "x": datetime.utcfromtimestamp(int(bucket["key"]) / 1000),
508 "x": datetime.utcfromtimestamp(int(bucket["key"]) / 1000),
509 "report": 0,
509 "report": 0,
510 "not_found": 0,
510 "not_found": 0,
511 "slow_report": 0,
511 "slow_report": 0,
512 }
512 }
513 for subbucket in bucket["types"]["sub_agg"]["buckets"]:
513 for subbucket in bucket["types"]["sub_agg"]["buckets"]:
514 if subbucket["key"] == "slow":
514 if subbucket["key"] == "slow":
515 point["slow_report"] = subbucket["doc_count"]
515 point["slow_report"] = subbucket["doc_count"]
516 elif subbucket["key"] == "error":
516 elif subbucket["key"] == "error":
517 point["report"] = subbucket["doc_count"]
517 point["report"] = subbucket["doc_count"]
518 elif subbucket["key"] == "not_found":
518 elif subbucket["key"] == "not_found":
519 point["not_found"] = subbucket["doc_count"]
519 point["not_found"] = subbucket["doc_count"]
520 series.append(point)
520 series.append(point)
521 return series
521 return series
@@ -1,612 +1,612 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors
3 # Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors
4 #
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
7 # You may obtain a copy of the License at
8 #
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
10 #
11 # Unless required by applicable law or agreed to in writing, software
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
15 # limitations under the License.
16
16
17 from datetime import datetime
17 from datetime import datetime
18
18
19 import appenlight.lib.helpers as h
19 import appenlight.lib.helpers as h
20 from appenlight.models import get_db_session, Datastores
20 from appenlight.models import get_db_session, Datastores
21 from appenlight.models.services.base import BaseService
21 from appenlight.models.services.base import BaseService
22 from appenlight.lib.enums import ReportType
22 from appenlight.lib.enums import ReportType
23 from appenlight.lib.utils import es_index_name_limiter
23 from appenlight.lib.utils import es_index_name_limiter
24
24
25 try:
25 try:
26 from ae_uptime_ce.models.services.uptime_metric import UptimeMetricService
26 from ae_uptime_ce.models.services.uptime_metric import UptimeMetricService
27 except ImportError:
27 except ImportError:
28 UptimeMetricService = None
28 UptimeMetricService = None
29
29
30
30
31 def check_key(key, stats, uptime, total_seconds):
31 def check_key(key, stats, uptime, total_seconds):
32 if key not in stats:
32 if key not in stats:
33 stats[key] = {
33 stats[key] = {
34 "name": key,
34 "name": key,
35 "requests": 0,
35 "requests": 0,
36 "errors": 0,
36 "errors": 0,
37 "tolerated_requests": 0,
37 "tolerated_requests": 0,
38 "frustrating_requests": 0,
38 "frustrating_requests": 0,
39 "satisfying_requests": 0,
39 "satisfying_requests": 0,
40 "total_minutes": total_seconds / 60.0,
40 "total_minutes": total_seconds / 60.0,
41 "uptime": uptime,
41 "uptime": uptime,
42 "apdex": 0,
42 "apdex": 0,
43 "rpm": 0,
43 "rpm": 0,
44 "response_time": 0,
44 "response_time": 0,
45 "avg_response_time": 0,
45 "avg_response_time": 0,
46 }
46 }
47
47
48
48
49 class RequestMetricService(BaseService):
49 class RequestMetricService(BaseService):
50 @classmethod
50 @classmethod
51 def get_metrics_stats(cls, request, filter_settings, db_session=None):
51 def get_metrics_stats(cls, request, filter_settings, db_session=None):
52 delta = filter_settings["end_date"] - filter_settings["start_date"]
52 delta = filter_settings["end_date"] - filter_settings["start_date"]
53 if delta < h.time_deltas.get("12h")["delta"]:
53 if delta < h.time_deltas.get("12h")["delta"]:
54 interval = "1m"
54 interval = "1m"
55 elif delta <= h.time_deltas.get("3d")["delta"]:
55 elif delta <= h.time_deltas.get("3d")["delta"]:
56 interval = "5m"
56 interval = "5m"
57 elif delta >= h.time_deltas.get("2w")["delta"]:
57 elif delta >= h.time_deltas.get("2w")["delta"]:
58 interval = "24h"
58 interval = "24h"
59 else:
59 else:
60 interval = "1h"
60 interval = "1h"
61
61
62 filter_settings["namespace"] = ["appenlight.request_metric"]
62 filter_settings["namespace"] = ["appenlight.request_metric"]
63
63
64 es_query = {
64 es_query = {
65 "aggs": {
65 "aggs": {
66 "parent_agg": {
66 "parent_agg": {
67 "aggs": {
67 "aggs": {
68 "custom": {
68 "custom": {
69 "aggs": {
69 "aggs": {
70 "sub_agg": {
70 "sub_agg": {
71 "sum": {"field": "tags.custom.numeric_values"}
71 "sum": {"field": "tags.custom.numeric_values"}
72 }
72 }
73 },
73 },
74 "filter": {
74 "filter": {
75 "exists": {"field": "tags.custom.numeric_values"}
75 "exists": {"field": "tags.custom.numeric_values"}
76 },
76 },
77 },
77 },
78 "main": {
78 "main": {
79 "aggs": {
79 "aggs": {
80 "sub_agg": {
80 "sub_agg": {
81 "sum": {"field": "tags.main.numeric_values"}
81 "sum": {"field": "tags.main.numeric_values"}
82 }
82 }
83 },
83 },
84 "filter": {"exists": {"field": "tags.main.numeric_values"}},
84 "filter": {"exists": {"field": "tags.main.numeric_values"}},
85 },
85 },
86 "nosql": {
86 "nosql": {
87 "aggs": {
87 "aggs": {
88 "sub_agg": {
88 "sub_agg": {
89 "sum": {"field": "tags.nosql.numeric_values"}
89 "sum": {"field": "tags.nosql.numeric_values"}
90 }
90 }
91 },
91 },
92 "filter": {
92 "filter": {
93 "exists": {"field": "tags.nosql.numeric_values"}
93 "exists": {"field": "tags.nosql.numeric_values"}
94 },
94 },
95 },
95 },
96 "remote": {
96 "remote": {
97 "aggs": {
97 "aggs": {
98 "sub_agg": {
98 "sub_agg": {
99 "sum": {"field": "tags.remote.numeric_values"}
99 "sum": {"field": "tags.remote.numeric_values"}
100 }
100 }
101 },
101 },
102 "filter": {
102 "filter": {
103 "exists": {"field": "tags.remote.numeric_values"}
103 "exists": {"field": "tags.remote.numeric_values"}
104 },
104 },
105 },
105 },
106 "requests": {
106 "requests": {
107 "aggs": {
107 "aggs": {
108 "sub_agg": {
108 "sub_agg": {
109 "sum": {"field": "tags.requests.numeric_values"}
109 "sum": {"field": "tags.requests.numeric_values"}
110 }
110 }
111 },
111 },
112 "filter": {
112 "filter": {
113 "exists": {"field": "tags.requests.numeric_values"}
113 "exists": {"field": "tags.requests.numeric_values"}
114 },
114 },
115 },
115 },
116 "sql": {
116 "sql": {
117 "aggs": {
117 "aggs": {
118 "sub_agg": {"sum": {"field": "tags.sql.numeric_values"}}
118 "sub_agg": {"sum": {"field": "tags.sql.numeric_values"}}
119 },
119 },
120 "filter": {"exists": {"field": "tags.sql.numeric_values"}},
120 "filter": {"exists": {"field": "tags.sql.numeric_values"}},
121 },
121 },
122 "tmpl": {
122 "tmpl": {
123 "aggs": {
123 "aggs": {
124 "sub_agg": {
124 "sub_agg": {
125 "sum": {"field": "tags.tmpl.numeric_values"}
125 "sum": {"field": "tags.tmpl.numeric_values"}
126 }
126 }
127 },
127 },
128 "filter": {"exists": {"field": "tags.tmpl.numeric_values"}},
128 "filter": {"exists": {"field": "tags.tmpl.numeric_values"}},
129 },
129 },
130 },
130 },
131 "date_histogram": {
131 "date_histogram": {
132 "extended_bounds": {
132 "extended_bounds": {
133 "max": filter_settings["end_date"],
133 "max": filter_settings["end_date"],
134 "min": filter_settings["start_date"],
134 "min": filter_settings["start_date"],
135 },
135 },
136 "field": "timestamp",
136 "field": "timestamp",
137 "interval": interval,
137 "interval": interval,
138 "min_doc_count": 0,
138 "min_doc_count": 0,
139 },
139 },
140 }
140 }
141 },
141 },
142 "query": {
142 "query": {
143 "bool": {
143 "bool": {
144 "filter": [
144 "filter": [
145 {
145 {
146 "terms": {
146 "terms": {
147 "resource_id": [filter_settings["resource"][0]]
147 "resource_id": [filter_settings["resource"][0]]
148 }
148 }
149 },
149 },
150 {
150 {
151 "range": {
151 "range": {
152 "timestamp": {
152 "timestamp": {
153 "gte": filter_settings["start_date"],
153 "gte": filter_settings["start_date"],
154 "lte": filter_settings["end_date"],
154 "lte": filter_settings["end_date"],
155 }
155 }
156 }
156 }
157 },
157 },
158 {"terms": {"namespace": ["appenlight.request_metric"]}},
158 {"terms": {"namespace": ["appenlight.request_metric"]}},
159 ]
159 ]
160 }
160 }
161 },
161 },
162 }
162 }
163
163
164 index_names = es_index_name_limiter(
164 index_names = es_index_name_limiter(
165 start_date=filter_settings["start_date"],
165 start_date=filter_settings["start_date"],
166 end_date=filter_settings["end_date"],
166 end_date=filter_settings["end_date"],
167 ixtypes=["metrics"],
167 ixtypes=["metrics"],
168 )
168 )
169 if not index_names:
169 if not index_names:
170 return []
170 return []
171
171
172 result = Datastores.es.search(
172 result = Datastores.es.search(
173 body=es_query, index=index_names, doc_type="log", size=0
173 body=es_query, index=index_names, doc_type="log", size=0
174 )
174 )
175
175
176 plot_data = []
176 plot_data = []
177 for item in result["aggregations"]["parent_agg"]["buckets"]:
177 for item in result["aggregations"]["parent_agg"]["buckets"]:
178 x_time = datetime.utcfromtimestamp(int(item["key"]) / 1000)
178 x_time = datetime.utcfromtimestamp(int(item["key"]) / 1000)
179 point = {"x": x_time}
179 point = {"x": x_time}
180 for key in ["custom", "main", "nosql", "remote", "requests", "sql", "tmpl"]:
180 for key in ["custom", "main", "nosql", "remote", "requests", "sql", "tmpl"]:
181 value = item[key]["sub_agg"]["value"]
181 value = item[key]["sub_agg"]["value"]
182 point[key] = round(value, 3) if value else 0
182 point[key] = round(value, 3) if value else 0
183 plot_data.append(point)
183 plot_data.append(point)
184
184
185 return plot_data
185 return plot_data
186
186
187 @classmethod
187 @classmethod
188 def get_requests_breakdown(cls, request, filter_settings, db_session=None):
188 def get_requests_breakdown(cls, request, filter_settings, db_session=None):
189 db_session = get_db_session(db_session)
189 db_session = get_db_session(db_session)
190
190
191 # fetch total time of all requests in this time range
191 # fetch total time of all requests in this time range
192 index_names = es_index_name_limiter(
192 index_names = es_index_name_limiter(
193 start_date=filter_settings["start_date"],
193 start_date=filter_settings["start_date"],
194 end_date=filter_settings["end_date"],
194 end_date=filter_settings["end_date"],
195 ixtypes=["metrics"],
195 ixtypes=["metrics"],
196 )
196 )
197
197
198 if index_names and filter_settings["resource"]:
198 if index_names and filter_settings["resource"]:
199 es_query = {
199 es_query = {
200 "aggs": {
200 "aggs": {
201 "main": {
201 "main": {
202 "aggs": {
202 "aggs": {
203 "sub_agg": {"sum": {"field": "tags.main.numeric_values"}}
203 "sub_agg": {"sum": {"field": "tags.main.numeric_values"}}
204 },
204 },
205 "filter": {"exists": {"field": "tags.main.numeric_values"}},
205 "filter": {"exists": {"field": "tags.main.numeric_values"}},
206 }
206 }
207 },
207 },
208 "query": {
208 "query": {
209 "bool": {
209 "bool": {
210 "filter": [
210 "filter": [
211 {
211 {
212 "terms": {
212 "terms": {
213 "resource_id": [filter_settings["resource"][0]]
213 "resource_id": [filter_settings["resource"][0]]
214 }
214 }
215 },
215 },
216 {
216 {
217 "range": {
217 "range": {
218 "timestamp": {
218 "timestamp": {
219 "gte": filter_settings["start_date"],
219 "gte": filter_settings["start_date"],
220 "lte": filter_settings["end_date"],
220 "lte": filter_settings["end_date"],
221 }
221 }
222 }
222 }
223 },
223 },
224 {"terms": {"namespace": ["appenlight.request_metric"]}},
224 {"terms": {"namespace": ["appenlight.request_metric"]}},
225 ]
225 ]
226 }
226 }
227 },
227 },
228 }
228 }
229 result = Datastores.es.search(
229 result = Datastores.es.search(
230 body=es_query, index=index_names, doc_type="log", size=0
230 body=es_query, index=index_names, doc_type="log", size=0
231 )
231 )
232 total_time_spent = result["aggregations"]["main"]["sub_agg"]["value"]
232 total_time_spent = result["aggregations"]["main"]["sub_agg"]["value"]
233 else:
233 else:
234 total_time_spent = 0
234 total_time_spent = 0
235 script_text = "doc['tags.main.numeric_values'].value / {}".format(
235 script_text = "doc['tags.main.numeric_values'].value / {}".format(
236 total_time_spent
236 total_time_spent
237 )
237 )
238 if total_time_spent == 0:
238 if total_time_spent == 0:
239 script_text = '0'
239 script_text = '0'
240
240
241 if index_names and filter_settings["resource"]:
241 if index_names and filter_settings["resource"]:
242 es_query = {
242 es_query = {
243 "aggs": {
243 "aggs": {
244 "parent_agg": {
244 "parent_agg": {
245 "aggs": {
245 "aggs": {
246 "main": {
246 "main": {
247 "aggs": {
247 "aggs": {
248 "sub_agg": {
248 "sub_agg": {
249 "sum": {"field": "tags.main.numeric_values"}
249 "sum": {"field": "tags.main.numeric_values"}
250 }
250 }
251 },
251 },
252 "filter": {
252 "filter": {
253 "exists": {"field": "tags.main.numeric_values"}
253 "exists": {"field": "tags.main.numeric_values"}
254 },
254 },
255 },
255 },
256 "percentage": {
256 "percentage": {
257 "aggs": {
257 "aggs": {
258 "sub_agg": {
258 "sub_agg": {
259 "sum": {
259 "sum": {
260 "script": script_text,
260 "script": script_text,
261 }
261 }
262 }
262 }
263 },
263 },
264 "filter": {
264 "filter": {
265 "exists": {"field": "tags.main.numeric_values"}
265 "exists": {"field": "tags.main.numeric_values"}
266 },
266 },
267 },
267 },
268 "requests": {
268 "requests": {
269 "aggs": {
269 "aggs": {
270 "sub_agg": {
270 "sub_agg": {
271 "sum": {"field": "tags.requests.numeric_values"}
271 "sum": {"field": "tags.requests.numeric_values"}
272 }
272 }
273 },
273 },
274 "filter": {
274 "filter": {
275 "exists": {"field": "tags.requests.numeric_values"}
275 "exists": {"field": "tags.requests.numeric_values"}
276 },
276 },
277 },
277 },
278 },
278 },
279 "terms": {
279 "terms": {
280 "field": "tags.view_name.values.keyword",
280 "field": "tags.view_name.values.keyword",
281 "order": {"percentage>sub_agg": "desc"},
281 "order": {"percentage>sub_agg": "desc"},
282 "size": 15,
282 "size": 15,
283 },
283 },
284 }
284 }
285 },
285 },
286 "query": {
286 "query": {
287 "bool": {
287 "bool": {
288 "filter": [
288 "filter": [
289 {
289 {
290 "terms": {
290 "terms": {
291 "resource_id": [filter_settings["resource"][0]]
291 "resource_id": [filter_settings["resource"][0]]
292 }
292 }
293 },
293 },
294 {
294 {
295 "range": {
295 "range": {
296 "timestamp": {
296 "timestamp": {
297 "gte": filter_settings["start_date"],
297 "gte": filter_settings["start_date"],
298 "lte": filter_settings["end_date"],
298 "lte": filter_settings["end_date"],
299 }
299 }
300 }
300 }
301 },
301 },
302 ]
302 ]
303 }
303 }
304 },
304 },
305 }
305 }
306 result = Datastores.es.search(
306 result = Datastores.es.search(
307 body=es_query, index=index_names, doc_type="log", size=0
307 body=es_query, index=index_names, doc_type="log", size=0
308 )
308 )
309 series = result["aggregations"]["parent_agg"]["buckets"]
309 series = result["aggregations"]["parent_agg"]["buckets"]
310 else:
310 else:
311 series = []
311 series = []
312
312
313 and_part = [
313 and_part = [
314 {"term": {"resource_id": filter_settings["resource"][0]}},
314 {"term": {"resource_id": filter_settings["resource"][0]}},
315 {"terms": {"tags.view_name.values": [row["key"] for row in series]}},
315 {"terms": {"tags.view_name.values": [row["key"] for row in series]}},
316 {"term": {"report_type": str(ReportType.slow)}},
316 {"term": {"report_type": str(ReportType.slow)}},
317 ]
317 ]
318 query = {
318 query = {
319 "aggs": {
319 "aggs": {
320 "top_reports": {
320 "top_reports": {
321 "terms": {"field": "tags.view_name.values.keyword", "size": len(series)},
321 "terms": {"field": "tags.view_name.values.keyword", "size": len(series)},
322 "aggs": {
322 "aggs": {
323 "top_calls_hits": {
323 "top_calls_hits": {
324 "top_hits": {"sort": {"start_time": "desc"}, "size": 5}
324 "top_hits": {"sort": {"start_time": "desc"}, "size": 5}
325 }
325 }
326 },
326 },
327 }
327 }
328 },
328 },
329 "query": {"bool": {"filter": and_part}},
329 "query": {"bool": {"filter": and_part}},
330 }
330 }
331 details = {}
331 details = {}
332 index_names = es_index_name_limiter(ixtypes=["reports"])
332 index_names = es_index_name_limiter(ixtypes=["reports"])
333 if index_names and series:
333 if index_names and series:
334 result = Datastores.es.search(
334 result = Datastores.es.search(
335 body=query, doc_type="report", size=0, index=index_names
335 body=query, doc_type="report", size=0, index=index_names
336 )
336 )
337 for bucket in result["aggregations"]["top_reports"]["buckets"]:
337 for bucket in result["aggregations"]["top_reports"]["buckets"]:
338 details[bucket["key"]] = []
338 details[bucket["key"]] = []
339
339
340 for hit in bucket["top_calls_hits"]["hits"]["hits"]:
340 for hit in bucket["top_calls_hits"]["hits"]["hits"]:
341 details[bucket["key"]].append(
341 details[bucket["key"]].append(
342 {
342 {
343 "report_id": hit["_source"]["pg_id"],
343 "report_id": hit["_source"]["request_metric_id"],
344 "group_id": hit["_source"]["group_id"],
344 "group_id": hit["_source"]["group_id"],
345 }
345 }
346 )
346 )
347
347
348 results = []
348 results = []
349 for row in series:
349 for row in series:
350 result = {
350 result = {
351 "key": row["key"],
351 "key": row["key"],
352 "main": row["main"]["sub_agg"]["value"],
352 "main": row["main"]["sub_agg"]["value"],
353 "requests": row["requests"]["sub_agg"]["value"],
353 "requests": row["requests"]["sub_agg"]["value"],
354 }
354 }
355 # es can return 'infinity'
355 # es can return 'infinity'
356 try:
356 try:
357 result["percentage"] = float(row["percentage"]["sub_agg"]["value"])
357 result["percentage"] = float(row["percentage"]["sub_agg"]["value"])
358 except ValueError:
358 except ValueError:
359 result["percentage"] = 0
359 result["percentage"] = 0
360
360
361 result["latest_details"] = details.get(row["key"]) or []
361 result["latest_details"] = details.get(row["key"]) or []
362 results.append(result)
362 results.append(result)
363
363
364 return results
364 return results
365
365
366 @classmethod
366 @classmethod
367 def get_apdex_stats(cls, request, filter_settings, threshold=1, db_session=None):
367 def get_apdex_stats(cls, request, filter_settings, threshold=1, db_session=None):
368 """
368 """
369 Returns information and calculates APDEX score per server for dashboard
369 Returns information and calculates APDEX score per server for dashboard
370 server information (upper right stats boxes)
370 server information (upper right stats boxes)
371 """
371 """
372 # Apdex t = (Satisfied Count + Tolerated Count / 2) / Total Samples
372 # Apdex t = (Satisfied Count + Tolerated Count / 2) / Total Samples
373 db_session = get_db_session(db_session)
373 db_session = get_db_session(db_session)
374 index_names = es_index_name_limiter(
374 index_names = es_index_name_limiter(
375 start_date=filter_settings["start_date"],
375 start_date=filter_settings["start_date"],
376 end_date=filter_settings["end_date"],
376 end_date=filter_settings["end_date"],
377 ixtypes=["metrics"],
377 ixtypes=["metrics"],
378 )
378 )
379
379
380 requests_series = []
380 requests_series = []
381
381
382 if index_names and filter_settings["resource"]:
382 if index_names and filter_settings["resource"]:
383 es_query = {
383 es_query = {
384 "aggs": {
384 "aggs": {
385 "parent_agg": {
385 "parent_agg": {
386 "aggs": {
386 "aggs": {
387 "frustrating": {
387 "frustrating": {
388 "aggs": {
388 "aggs": {
389 "sub_agg": {
389 "sub_agg": {
390 "sum": {"field": "tags.requests.numeric_values"}
390 "sum": {"field": "tags.requests.numeric_values"}
391 }
391 }
392 },
392 },
393 "filter": {
393 "filter": {
394 "bool": {
394 "bool": {
395 "filter": [
395 "filter": [
396 {
396 {
397 "range": {
397 "range": {
398 "tags.main.numeric_values": {"gte": "4"}
398 "tags.main.numeric_values": {"gte": "4"}
399 }
399 }
400 },
400 },
401 {
401 {
402 "exists": {
402 "exists": {
403 "field": "tags.requests.numeric_values"
403 "field": "tags.requests.numeric_values"
404 }
404 }
405 },
405 },
406 ]
406 ]
407 }
407 }
408 },
408 },
409 },
409 },
410 "main": {
410 "main": {
411 "aggs": {
411 "aggs": {
412 "sub_agg": {
412 "sub_agg": {
413 "sum": {"field": "tags.main.numeric_values"}
413 "sum": {"field": "tags.main.numeric_values"}
414 }
414 }
415 },
415 },
416 "filter": {
416 "filter": {
417 "exists": {"field": "tags.main.numeric_values"}
417 "exists": {"field": "tags.main.numeric_values"}
418 },
418 },
419 },
419 },
420 "requests": {
420 "requests": {
421 "aggs": {
421 "aggs": {
422 "sub_agg": {
422 "sub_agg": {
423 "sum": {"field": "tags.requests.numeric_values"}
423 "sum": {"field": "tags.requests.numeric_values"}
424 }
424 }
425 },
425 },
426 "filter": {
426 "filter": {
427 "exists": {"field": "tags.requests.numeric_values"}
427 "exists": {"field": "tags.requests.numeric_values"}
428 },
428 },
429 },
429 },
430 "tolerated": {
430 "tolerated": {
431 "aggs": {
431 "aggs": {
432 "sub_agg": {
432 "sub_agg": {
433 "sum": {"field": "tags.requests.numeric_values"}
433 "sum": {"field": "tags.requests.numeric_values"}
434 }
434 }
435 },
435 },
436 "filter": {
436 "filter": {
437 "bool": {"filter": [
437 "bool": {"filter": [
438 {
438 {
439 "range": {
439 "range": {
440 "tags.main.numeric_values": {"gte": "1"}
440 "tags.main.numeric_values": {"gte": "1"}
441 }
441 }
442 },
442 },
443 {
443 {
444 "range": {
444 "range": {
445 "tags.main.numeric_values": {"lt": "4"}
445 "tags.main.numeric_values": {"lt": "4"}
446 }
446 }
447 },
447 },
448 {
448 {
449 "exists": {
449 "exists": {
450 "field": "tags.requests.numeric_values"
450 "field": "tags.requests.numeric_values"
451 }
451 }
452 },
452 },
453 ]}
453 ]}
454 },
454 },
455 },
455 },
456 },
456 },
457 "terms": {"field": "tags.server_name.values.keyword", "size": 999999},
457 "terms": {"field": "tags.server_name.values.keyword", "size": 999999},
458 }
458 }
459 },
459 },
460 "query": {
460 "query": {
461 "bool": {
461 "bool": {
462 "filter": [
462 "filter": [
463 {
463 {
464 "terms": {
464 "terms": {
465 "resource_id": [filter_settings["resource"][0]]
465 "resource_id": [filter_settings["resource"][0]]
466 }
466 }
467 },
467 },
468 {
468 {
469 "range": {
469 "range": {
470 "timestamp": {
470 "timestamp": {
471 "gte": filter_settings["start_date"],
471 "gte": filter_settings["start_date"],
472 "lte": filter_settings["end_date"],
472 "lte": filter_settings["end_date"],
473 }
473 }
474 }
474 }
475 },
475 },
476 {"terms": {"namespace": ["appenlight.request_metric"]}},
476 {"terms": {"namespace": ["appenlight.request_metric"]}},
477 ]
477 ]
478 }
478 }
479 },
479 },
480 }
480 }
481
481
482 result = Datastores.es.search(
482 result = Datastores.es.search(
483 body=es_query, index=index_names, doc_type="log", size=0
483 body=es_query, index=index_names, doc_type="log", size=0
484 )
484 )
485 for bucket in result["aggregations"]["parent_agg"]["buckets"]:
485 for bucket in result["aggregations"]["parent_agg"]["buckets"]:
486 requests_series.append(
486 requests_series.append(
487 {
487 {
488 "frustrating": bucket["frustrating"]["sub_agg"]["value"],
488 "frustrating": bucket["frustrating"]["sub_agg"]["value"],
489 "main": bucket["main"]["sub_agg"]["value"],
489 "main": bucket["main"]["sub_agg"]["value"],
490 "requests": bucket["requests"]["sub_agg"]["value"],
490 "requests": bucket["requests"]["sub_agg"]["value"],
491 "tolerated": bucket["tolerated"]["sub_agg"]["value"],
491 "tolerated": bucket["tolerated"]["sub_agg"]["value"],
492 "key": bucket["key"],
492 "key": bucket["key"],
493 }
493 }
494 )
494 )
495
495
496 since_when = filter_settings["start_date"]
496 since_when = filter_settings["start_date"]
497 until = filter_settings["end_date"]
497 until = filter_settings["end_date"]
498
498
499 # total errors
499 # total errors
500
500
501 index_names = es_index_name_limiter(
501 index_names = es_index_name_limiter(
502 start_date=filter_settings["start_date"],
502 start_date=filter_settings["start_date"],
503 end_date=filter_settings["end_date"],
503 end_date=filter_settings["end_date"],
504 ixtypes=["reports"],
504 ixtypes=["reports"],
505 )
505 )
506
506
507 report_series = []
507 report_series = []
508 if index_names and filter_settings["resource"]:
508 if index_names and filter_settings["resource"]:
509 report_type = ReportType.key_from_value(ReportType.error)
509 report_type = ReportType.key_from_value(ReportType.error)
510 es_query = {
510 es_query = {
511 "aggs": {
511 "aggs": {
512 "parent_agg": {
512 "parent_agg": {
513 "aggs": {
513 "aggs": {
514 "errors": {
514 "errors": {
515 "aggs": {
515 "aggs": {
516 "sub_agg": {
516 "sub_agg": {
517 "sum": {
517 "sum": {
518 "field": "tags.occurences.numeric_values"
518 "field": "tags.occurences.numeric_values"
519 }
519 }
520 }
520 }
521 },
521 },
522 "filter": {
522 "filter": {
523 "bool": {
523 "bool": {
524 "filter": [
524 "filter": [
525 {"terms": {"tags.type.values": [report_type]}},
525 {"terms": {"tags.type.values": [report_type]}},
526 {
526 {
527 "exists": {
527 "exists": {
528 "field": "tags.occurences.numeric_values"
528 "field": "tags.occurences.numeric_values"
529 }
529 }
530 },
530 },
531 ]
531 ]
532 }
532 }
533 },
533 },
534 }
534 }
535 },
535 },
536 "terms": {"field": "tags.server_name.values.keyword", "size": 999999},
536 "terms": {"field": "tags.server_name.values.keyword", "size": 999999},
537 }
537 }
538 },
538 },
539 "query": {
539 "query": {
540 "bool": {
540 "bool": {
541 "filter": [
541 "filter": [
542 {
542 {
543 "terms": {
543 "terms": {
544 "resource_id": [filter_settings["resource"][0]]
544 "resource_id": [filter_settings["resource"][0]]
545 }
545 }
546 },
546 },
547 {
547 {
548 "range": {
548 "range": {
549 "timestamp": {
549 "timestamp": {
550 "gte": filter_settings["start_date"],
550 "gte": filter_settings["start_date"],
551 "lte": filter_settings["end_date"],
551 "lte": filter_settings["end_date"],
552 }
552 }
553 }
553 }
554 },
554 },
555 {"terms": {"namespace": ["appenlight.error"]}},
555 {"terms": {"namespace": ["appenlight.error"]}},
556 ]
556 ]
557 }
557 }
558 },
558 },
559 }
559 }
560 result = Datastores.es.search(
560 result = Datastores.es.search(
561 body=es_query, index=index_names, doc_type="log", size=0
561 body=es_query, index=index_names, doc_type="log", size=0
562 )
562 )
563 for bucket in result["aggregations"]["parent_agg"]["buckets"]:
563 for bucket in result["aggregations"]["parent_agg"]["buckets"]:
564 report_series.append(
564 report_series.append(
565 {
565 {
566 "key": bucket["key"],
566 "key": bucket["key"],
567 "errors": bucket["errors"]["sub_agg"]["value"],
567 "errors": bucket["errors"]["sub_agg"]["value"],
568 }
568 }
569 )
569 )
570
570
571 stats = {}
571 stats = {}
572 if UptimeMetricService is not None:
572 if UptimeMetricService is not None:
573 uptime = UptimeMetricService.get_uptime_by_app(
573 uptime = UptimeMetricService.get_uptime_by_app(
574 filter_settings["resource"][0], since_when=since_when, until=until
574 filter_settings["resource"][0], since_when=since_when, until=until
575 )
575 )
576 else:
576 else:
577 uptime = 0
577 uptime = 0
578
578
579 total_seconds = (until - since_when).total_seconds()
579 total_seconds = (until - since_when).total_seconds()
580
580
581 for stat in requests_series:
581 for stat in requests_series:
582 check_key(stat["key"], stats, uptime, total_seconds)
582 check_key(stat["key"], stats, uptime, total_seconds)
583 stats[stat["key"]]["requests"] = int(stat["requests"])
583 stats[stat["key"]]["requests"] = int(stat["requests"])
584 stats[stat["key"]]["response_time"] = stat["main"]
584 stats[stat["key"]]["response_time"] = stat["main"]
585 stats[stat["key"]]["tolerated_requests"] = stat["tolerated"]
585 stats[stat["key"]]["tolerated_requests"] = stat["tolerated"]
586 stats[stat["key"]]["frustrating_requests"] = stat["frustrating"]
586 stats[stat["key"]]["frustrating_requests"] = stat["frustrating"]
587 for server in report_series:
587 for server in report_series:
588 check_key(server["key"], stats, uptime, total_seconds)
588 check_key(server["key"], stats, uptime, total_seconds)
589 stats[server["key"]]["errors"] = server["errors"]
589 stats[server["key"]]["errors"] = server["errors"]
590
590
591 server_stats = list(stats.values())
591 server_stats = list(stats.values())
592 for stat in server_stats:
592 for stat in server_stats:
593 stat["satisfying_requests"] = (
593 stat["satisfying_requests"] = (
594 stat["requests"]
594 stat["requests"]
595 - stat["errors"]
595 - stat["errors"]
596 - stat["frustrating_requests"]
596 - stat["frustrating_requests"]
597 - stat["tolerated_requests"]
597 - stat["tolerated_requests"]
598 )
598 )
599 if stat["satisfying_requests"] < 0:
599 if stat["satisfying_requests"] < 0:
600 stat["satisfying_requests"] = 0
600 stat["satisfying_requests"] = 0
601
601
602 if stat["requests"]:
602 if stat["requests"]:
603 stat["avg_response_time"] = round(
603 stat["avg_response_time"] = round(
604 stat["response_time"] / stat["requests"], 3
604 stat["response_time"] / stat["requests"], 3
605 )
605 )
606 qual_requests = (
606 qual_requests = (
607 stat["satisfying_requests"] + stat["tolerated_requests"] / 2.0
607 stat["satisfying_requests"] + stat["tolerated_requests"] / 2.0
608 )
608 )
609 stat["apdex"] = round((qual_requests / stat["requests"]) * 100, 2)
609 stat["apdex"] = round((qual_requests / stat["requests"]) * 100, 2)
610 stat["rpm"] = round(stat["requests"] / stat["total_minutes"], 2)
610 stat["rpm"] = round(stat["requests"] / stat["total_minutes"], 2)
611
611
612 return sorted(server_stats, key=lambda x: x["name"])
612 return sorted(server_stats, key=lambda x: x["name"])
@@ -1,127 +1,127 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors
3 # Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors
4 #
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
7 # You may obtain a copy of the License at
8 #
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
10 #
11 # Unless required by applicable law or agreed to in writing, software
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
15 # limitations under the License.
16
16
17 import sqlalchemy as sa
17 import sqlalchemy as sa
18 import hashlib
18 import hashlib
19
19
20 from datetime import datetime, timedelta
20 from datetime import datetime, timedelta
21 from appenlight.models import Base
21 from appenlight.models import Base
22 from sqlalchemy.dialects.postgresql import JSON
22 from sqlalchemy.dialects.postgresql import JSON
23 from ziggurat_foundations.models.base import BaseModel
23 from ziggurat_foundations.models.base import BaseModel
24
24
25
25
26 class SlowCall(Base, BaseModel):
26 class SlowCall(Base, BaseModel):
27 __tablename__ = "slow_calls"
27 __tablename__ = "slow_calls"
28 __table_args__ = {"implicit_returning": False}
28 __table_args__ = {"implicit_returning": False}
29
29
30 resource_id = sa.Column(sa.Integer(), nullable=False, index=True)
30 resource_id = sa.Column(sa.Integer(), nullable=False, index=True)
31 id = sa.Column(sa.Integer, nullable=False, primary_key=True)
31 id = sa.Column(sa.Integer, nullable=False, primary_key=True)
32 report_id = sa.Column(
32 report_id = sa.Column(
33 sa.BigInteger,
33 sa.BigInteger,
34 sa.ForeignKey("reports.id", ondelete="cascade", onupdate="cascade"),
34 sa.ForeignKey("reports.id", ondelete="cascade", onupdate="cascade"),
35 primary_key=True,
35 primary_key=True,
36 )
36 )
37 duration = sa.Column(sa.Float(), default=0)
37 duration = sa.Column(sa.Float(), default=0)
38 statement = sa.Column(sa.UnicodeText(), default="")
38 statement = sa.Column(sa.UnicodeText(), default="")
39 statement_hash = sa.Column(sa.Unicode(60), default="")
39 statement_hash = sa.Column(sa.Unicode(60), default="")
40 parameters = sa.Column(JSON(), nullable=False, default=dict)
40 parameters = sa.Column(JSON(), nullable=False, default=dict)
41 type = sa.Column(sa.Unicode(16), default="")
41 type = sa.Column(sa.Unicode(16), default="")
42 subtype = sa.Column(sa.Unicode(16), default=None)
42 subtype = sa.Column(sa.Unicode(16), default=None)
43 location = sa.Column(sa.Unicode(255), default="")
43 location = sa.Column(sa.Unicode(255), default="")
44 timestamp = sa.Column(
44 timestamp = sa.Column(
45 sa.DateTime(), default=datetime.utcnow, server_default=sa.func.now()
45 sa.DateTime(), default=datetime.utcnow, server_default=sa.func.now()
46 )
46 )
47 report_group_time = sa.Column(
47 report_group_time = sa.Column(
48 sa.DateTime(), default=datetime.utcnow, server_default=sa.func.now()
48 sa.DateTime(), default=datetime.utcnow, server_default=sa.func.now()
49 )
49 )
50
50
51 def set_data(
51 def set_data(
52 self, data, protocol_version=None, resource_id=None, report_group=None
52 self, data, protocol_version=None, resource_id=None, report_group=None
53 ):
53 ):
54 self.resource_id = resource_id
54 self.resource_id = resource_id
55 if data.get("start") and data.get("end"):
55 if data.get("start") and data.get("end"):
56 self.timestamp = data.get("start")
56 self.timestamp = data.get("start")
57 d = data.get("end") - data.get("start")
57 d = data.get("end") - data.get("start")
58 self.duration = d.total_seconds()
58 self.duration = d.total_seconds()
59 self.statement = data.get("statement", "")
59 self.statement = data.get("statement", "")
60 self.type = data.get("type", "unknown")[:16]
60 self.type = data.get("type", "unknown")[:16]
61 self.parameters = data.get("parameters", {})
61 self.parameters = data.get("parameters", {})
62 self.location = data.get("location", "")[:255]
62 self.location = data.get("location", "")[:255]
63 self.report_group_time = report_group.first_timestamp
63 self.report_group_time = report_group.first_timestamp
64 if "subtype" in data:
64 if "subtype" in data:
65 self.subtype = data.get("subtype", "unknown")[:16]
65 self.subtype = data.get("subtype", "unknown")[:16]
66 if self.type == "tmpl":
66 if self.type == "tmpl":
67 self.set_hash("{} {}".format(self.statement, self.parameters))
67 self.set_hash("{} {}".format(self.statement, self.parameters))
68 else:
68 else:
69 self.set_hash()
69 self.set_hash()
70
70
71 def set_hash(self, custom_statement=None):
71 def set_hash(self, custom_statement=None):
72 statement = custom_statement or self.statement
72 statement = custom_statement or self.statement
73 self.statement_hash = hashlib.sha1(statement.encode("utf8")).hexdigest()
73 self.statement_hash = hashlib.sha1(statement.encode("utf8")).hexdigest()
74
74
75 @property
75 @property
76 def end_time(self):
76 def end_time(self):
77 if self.duration and self.timestamp:
77 if self.duration and self.timestamp:
78 return self.timestamp + timedelta(seconds=self.duration)
78 return self.timestamp + timedelta(seconds=self.duration)
79 return None
79 return None
80
80
81 def get_dict(self):
81 def get_dict(self):
82 instance_dict = super(SlowCall, self).get_dict()
82 instance_dict = super(SlowCall, self).get_dict()
83 instance_dict["children"] = []
83 instance_dict["children"] = []
84 instance_dict["end_time"] = self.end_time
84 instance_dict["end_time"] = self.end_time
85 return instance_dict
85 return instance_dict
86
86
87 def es_doc(self):
87 def es_doc(self):
88 doc = {
88 doc = {
89 "resource_id": self.resource_id,
89 "resource_id": self.resource_id,
90 "timestamp": self.timestamp,
90 "timestamp": self.timestamp,
91 "pg_id": str(self.id),
91 "slow_call_id": str(self.id),
92 "permanent": False,
92 "permanent": False,
93 "request_id": None,
93 "request_id": None,
94 "log_level": "UNKNOWN",
94 "log_level": "UNKNOWN",
95 "message": self.statement,
95 "message": self.statement,
96 "namespace": "appenlight.slow_call",
96 "namespace": "appenlight.slow_call",
97 "tags": {
97 "tags": {
98 "report_id": {
98 "report_id": {
99 "values": self.report_id,
99 "values": self.report_id,
100 "numeric_values": self.report_id,
100 "numeric_values": self.report_id,
101 },
101 },
102 "duration": {"values": None, "numeric_values": self.duration},
102 "duration": {"values": None, "numeric_values": self.duration},
103 "statement_hash": {
103 "statement_hash": {
104 "values": self.statement_hash,
104 "values": self.statement_hash,
105 "numeric_values": None,
105 "numeric_values": None,
106 },
106 },
107 "type": {"values": self.type, "numeric_values": None},
107 "type": {"values": self.type, "numeric_values": None},
108 "subtype": {"values": self.subtype, "numeric_values": None},
108 "subtype": {"values": self.subtype, "numeric_values": None},
109 "location": {"values": self.location, "numeric_values": None},
109 "location": {"values": self.location, "numeric_values": None},
110 "parameters": {"values": None, "numeric_values": None},
110 "parameters": {"values": None, "numeric_values": None},
111 },
111 },
112 "tag_list": [
112 "tag_list": [
113 "report_id",
113 "report_id",
114 "duration",
114 "duration",
115 "statement_hash",
115 "statement_hash",
116 "type",
116 "type",
117 "subtype",
117 "subtype",
118 "location",
118 "location",
119 ],
119 ],
120 }
120 }
121 if isinstance(self.parameters, str):
121 if isinstance(self.parameters, str):
122 doc["tags"]["parameters"]["values"] = self.parameters[:255]
122 doc["tags"]["parameters"]["values"] = self.parameters[:255]
123 return doc
123 return doc
124
124
125 @property
125 @property
126 def partition_id(self):
126 def partition_id(self):
127 return "rcae_sc_%s" % self.report_group_time.strftime("%Y_%m")
127 return "rcae_sc_%s" % self.report_group_time.strftime("%Y_%m")
@@ -1,458 +1,577 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors
3 # Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors
4 #
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
7 # You may obtain a copy of the License at
8 #
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
10 #
11 # Unless required by applicable law or agreed to in writing, software
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
15 # limitations under the License.
16
16
17 import argparse
17 import argparse
18 import datetime
18 import datetime
19 import logging
19 import logging
20 import copy
20
21
21 import sqlalchemy as sa
22 import sqlalchemy as sa
22 import elasticsearch.exceptions
23 import elasticsearch.exceptions
23 import elasticsearch.helpers
24 import elasticsearch.helpers
24
25
25 from collections import defaultdict
26 from collections import defaultdict
26 from pyramid.paster import setup_logging
27 from pyramid.paster import setup_logging
27 from pyramid.paster import bootstrap
28 from pyramid.paster import bootstrap
28 from appenlight.models import DBSession, Datastores, metadata
29 from appenlight.models import DBSession, Datastores, metadata
29 from appenlight.lib import get_callable
30 from appenlight.lib import get_callable
30 from appenlight.models.report_group import ReportGroup
31 from appenlight.models.report_group import ReportGroup
31 from appenlight.models.report import Report
32 from appenlight.models.report import Report
32 from appenlight.models.report_stat import ReportStat
33 from appenlight.models.report_stat import ReportStat
33 from appenlight.models.log import Log
34 from appenlight.models.log import Log
34 from appenlight.models.slow_call import SlowCall
35 from appenlight.models.slow_call import SlowCall
35 from appenlight.models.metric import Metric
36 from appenlight.models.metric import Metric
36
37
37
38 log = logging.getLogger(__name__)
38 log = logging.getLogger(__name__)
39
39
40 tables = {
40 tables = {
41 "slow_calls_p_": [],
41 "slow_calls_p_": [],
42 "reports_stats_p_": [],
42 "reports_stats_p_": [],
43 "reports_p_": [],
43 "reports_p_": [],
44 "reports_groups_p_": [],
44 "reports_groups_p_": [],
45 "logs_p_": [],
45 "logs_p_": [],
46 "metrics_p_": [],
46 "metrics_p_": [],
47 }
47 }
48
48
49
49
50 def detect_tables(table_prefix):
50 def detect_tables(table_prefix):
51 found_tables = []
51 found_tables = []
52 db_tables_query = """
52 db_tables_query = """
53 SELECT tablename FROM pg_tables WHERE tablename NOT LIKE 'pg_%' AND
53 SELECT tablename FROM pg_tables WHERE tablename NOT LIKE 'pg_%' AND
54 tablename NOT LIKE 'sql_%' ORDER BY tablename ASC;"""
54 tablename NOT LIKE 'sql_%' ORDER BY tablename ASC;"""
55
55
56 for table in DBSession.execute(db_tables_query).fetchall():
56 for table in DBSession.execute(db_tables_query).fetchall():
57 tablename = table.tablename
57 tablename = table.tablename
58 if tablename.startswith(table_prefix):
58 if tablename.startswith(table_prefix):
59 t = sa.Table(
59 t = sa.Table(
60 tablename, metadata, autoload=True, autoload_with=DBSession.bind.engine
60 tablename, metadata, autoload=True, autoload_with=DBSession.bind.engine
61 )
61 )
62 found_tables.append(t)
62 found_tables.append(t)
63 return found_tables
63 return found_tables
64
64
65
65
66 def main():
66 def main():
67 """
67 """
68 Recreates Elasticsearch indexes
68 Recreates Elasticsearch indexes
69 Performs reindex of whole db to Elasticsearch
69 Performs reindex of whole db to Elasticsearch
70
70
71 """
71 """
72
72
73 # need parser twice because we first need to load ini file
73 # need parser twice because we first need to load ini file
74 # bootstrap pyramid and then load plugins
74 # bootstrap pyramid and then load plugins
75 pre_parser = argparse.ArgumentParser(
75 pre_parser = argparse.ArgumentParser(
76 description="Reindex AppEnlight data", add_help=False
76 description="Reindex AppEnlight data", add_help=False
77 )
77 )
78 pre_parser.add_argument(
78 pre_parser.add_argument(
79 "-c", "--config", required=True, help="Configuration ini file of application"
79 "-c", "--config", required=True, help="Configuration ini file of application"
80 )
80 )
81 pre_parser.add_argument("-h", "--help", help="Show help", nargs="?")
81 pre_parser.add_argument("-h", "--help", help="Show help", nargs="?")
82 pre_parser.add_argument(
82 pre_parser.add_argument(
83 "-t", "--types", nargs="+", help="Which parts of database should get reindexed"
83 "-t", "--types", nargs="+", help="Which parts of database should get reindexed"
84 )
84 )
85 args = pre_parser.parse_args()
85 args = pre_parser.parse_args()
86
86
87 config_uri = args.config
87 config_uri = args.config
88 setup_logging(config_uri)
88 setup_logging(config_uri)
89 log.setLevel(logging.INFO)
89 log.setLevel(logging.INFO)
90 env = bootstrap(config_uri)
90 env = bootstrap(config_uri)
91 parser = argparse.ArgumentParser(description="Reindex AppEnlight data")
91 parser = argparse.ArgumentParser(description="Reindex AppEnlight data")
92 choices = {
92 choices = {
93 "reports": "appenlight.scripts.reindex_elasticsearch:reindex_reports",
93 "reports": "appenlight.scripts.reindex_elasticsearch:reindex_reports",
94 "logs": "appenlight.scripts.reindex_elasticsearch:reindex_logs",
94 "logs": "appenlight.scripts.reindex_elasticsearch:reindex_logs",
95 "metrics": "appenlight.scripts.reindex_elasticsearch:reindex_metrics",
95 "metrics": "appenlight.scripts.reindex_elasticsearch:reindex_metrics",
96 "slow_calls": "appenlight.scripts.reindex_elasticsearch:reindex_slow_calls",
96 "slow_calls": "appenlight.scripts.reindex_elasticsearch:reindex_slow_calls",
97 "template": "appenlight.scripts.reindex_elasticsearch:update_template",
97 "template": "appenlight.scripts.reindex_elasticsearch:update_template",
98 }
98 }
99 for k, v in env["registry"].appenlight_plugins.items():
99 for k, v in env["registry"].appenlight_plugins.items():
100 if v.get("fulltext_indexer"):
100 if v.get("fulltext_indexer"):
101 choices[k] = v["fulltext_indexer"]
101 choices[k] = v["fulltext_indexer"]
102 parser.add_argument(
102 parser.add_argument(
103 "-t",
103 "-t",
104 "--types",
104 "--types",
105 nargs="*",
105 nargs="*",
106 choices=["all"] + list(choices.keys()),
106 choices=["all"] + list(choices.keys()),
107 default=[],
107 default=[],
108 help="Which parts of database should get reindexed",
108 help="Which parts of database should get reindexed",
109 )
109 )
110 parser.add_argument(
110 parser.add_argument(
111 "-c", "--config", required=True, help="Configuration ini file of application"
111 "-c", "--config", required=True, help="Configuration ini file of application"
112 )
112 )
113 args = parser.parse_args()
113 args = parser.parse_args()
114
114
115 if "all" in args.types:
115 if "all" in args.types:
116 args.types = list(choices.keys())
116 args.types = list(choices.keys())
117
117
118 print("Selected types to reindex: {}".format(args.types))
118 print("Selected types to reindex: {}".format(args.types))
119
119
120 log.info("settings {}".format(args.types))
120 log.info("settings {}".format(args.types))
121
121
122 if "template" in args.types:
122 if "template" in args.types:
123 get_callable(choices["template"])()
123 get_callable(choices["template"])()
124 args.types.remove("template")
124 args.types.remove("template")
125 for selected in args.types:
125 for selected in args.types:
126 get_callable(choices[selected])()
126 get_callable(choices[selected])()
127
127
128
128
129 def update_template():
129 def update_template():
130 try:
130 try:
131 Datastores.es.indices.delete_template("rcae")
131 Datastores.es.indices.delete_template("rcae_reports")
132 except elasticsearch.exceptions.NotFoundError as e:
133 log.error(e)
134
135 try:
136 Datastores.es.indices.delete_template("rcae_logs")
137 except elasticsearch.exceptions.NotFoundError as e:
138 log.error(e)
139 try:
140 Datastores.es.indices.delete_template("rcae_slow_calls")
141 except elasticsearch.exceptions.NotFoundError as e:
142 log.error(e)
143 try:
144 Datastores.es.indices.delete_template("rcae_metrics")
132 except elasticsearch.exceptions.NotFoundError as e:
145 except elasticsearch.exceptions.NotFoundError as e:
133 log.error(e)
146 log.error(e)
134 log.info("updating elasticsearch template")
147 log.info("updating elasticsearch template")
135 tag_templates = [
148 tag_templates = [
136 {
149 {
137 "values": {
150 "values": {
138 "path_match": "tags.*",
151 "path_match": "tags.*",
139 "mapping": {
152 "mapping": {
140 "type": "object",
153 "type": "object",
141 "properties": {
154 "properties": {
142 "values": {"type": "text", "analyzer": "tag_value",
155 "values": {"type": "text", "analyzer": "tag_value",
143 "fields": {
156 "fields": {
144 "keyword": {
157 "keyword": {
145 "type": "keyword",
158 "type": "keyword",
146 "ignore_above": 256
159 "ignore_above": 256
147 }
160 }
148 }},
161 }},
149 "numeric_values": {"type": "float"},
162 "numeric_values": {"type": "float"},
150 },
163 },
151 },
164 },
152 }
165 }
153 }
166 }
154 ]
167 ]
155
168
156 template_schema = {
169 shared_analysis = {
157 "template": "rcae_*",
170 "analyzer": {
171 "url_path": {
172 "type": "custom",
173 "char_filter": [],
174 "tokenizer": "path_hierarchy",
175 "filter": [],
176 },
177 "tag_value": {
178 "type": "custom",
179 "char_filter": [],
180 "tokenizer": "keyword",
181 "filter": ["lowercase"],
182 },
183 }
184 }
185
186 shared_log_mapping = {
187 "_all": {"enabled": False},
188 "dynamic_templates": tag_templates,
189 "properties": {
190 "pg_id": {"type": "keyword", "index": True},
191 "delete_hash": {"type": "keyword", "index": True},
192 "resource_id": {"type": "integer"},
193 "timestamp": {"type": "date"},
194 "permanent": {"type": "boolean"},
195 "request_id": {"type": "keyword", "index": True},
196 "log_level": {"type": "text", "analyzer": "simple"},
197 "message": {"type": "text", "analyzer": "simple"},
198 "namespace": {
199 "type": "text",
200 "fields": {"keyword": {"type": "keyword", "ignore_above": 256}},
201 },
202 "tags": {"type": "object"},
203 "tag_list": {"type": "text", "analyzer": "tag_value",
204 "fields": {
205 "keyword": {
206 "type": "keyword",
207 "ignore_above": 256
208 }
209 }},
210 },
211 }
212
213 report_schema = {
214 "template": "rcae_r_*",
158 "settings": {
215 "settings": {
159 "index": {
216 "index": {
160 "refresh_interval": "5s",
217 "refresh_interval": "5s",
161 "translog": {"sync_interval": "5s", "durability": "async"},
218 "translog": {"sync_interval": "5s", "durability": "async"},
219 "mapping": {"single_type": True}
162 },
220 },
163 "number_of_shards": 5,
221 "number_of_shards": 5,
164 "analysis": {
222 "analysis": shared_analysis,
165 "analyzer": {
166 "url_path": {
167 "type": "custom",
168 "char_filter": [],
169 "tokenizer": "path_hierarchy",
170 "filter": [],
171 },
172 "tag_value": {
173 "type": "custom",
174 "char_filter": [],
175 "tokenizer": "keyword",
176 "filter": ["lowercase"],
177 },
178 }
179 },
180 },
223 },
181 "mappings": {
224 "mappings": {
182 "report_group": {
225 "report": {
183 "_all": {"enabled": False},
226 "_all": {"enabled": False},
184 "dynamic_templates": tag_templates,
227 "dynamic_templates": tag_templates,
185 "properties": {
228 "properties": {
186 "pg_id": {"type": "keyword", "index": True},
229 "type": {"type": "keyword", "index": True},
230 # report group
231 "group_id": {"type": "keyword", "index": True},
187 "resource_id": {"type": "integer"},
232 "resource_id": {"type": "integer"},
188 "priority": {"type": "integer"},
233 "priority": {"type": "integer"},
189 "error": {"type": "text", "analyzer": "simple"},
234 "error": {"type": "text", "analyzer": "simple"},
190 "read": {"type": "boolean"},
235 "read": {"type": "boolean"},
191 "occurences": {"type": "integer"},
236 "occurences": {"type": "integer"},
192 "fixed": {"type": "boolean"},
237 "fixed": {"type": "boolean"},
193 "first_timestamp": {"type": "date"},
238 "first_timestamp": {"type": "date"},
194 "last_timestamp": {"type": "date"},
239 "last_timestamp": {"type": "date"},
195 "average_duration": {"type": "float"},
240 "average_duration": {"type": "float"},
196 "summed_duration": {"type": "float"},
241 "summed_duration": {"type": "float"},
197 "public": {"type": "boolean"},
242 "public": {"type": "boolean"},
198 },
243 # report
199 },
244
200 "report": {
245 "report_id": {"type": "keyword", "index": True},
201 "_all": {"enabled": False},
202 "dynamic_templates": tag_templates,
203 "properties": {
204 "pg_id": {"type": "keyword", "index": True},
205 "resource_id": {"type": "integer"},
206 "group_id": {"type": "keyword"},
207 "http_status": {"type": "integer"},
246 "http_status": {"type": "integer"},
208 "ip": {"type": "keyword", "index": True},
247 "ip": {"type": "keyword", "index": True},
209 "url_domain": {"type": "text", "analyzer": "simple"},
248 "url_domain": {"type": "text", "analyzer": "simple"},
210 "url_path": {"type": "text", "analyzer": "url_path"},
249 "url_path": {"type": "text", "analyzer": "url_path"},
211 "error": {"type": "text", "analyzer": "simple"},
212 "report_type": {"type": "integer"},
250 "report_type": {"type": "integer"},
213 "start_time": {"type": "date"},
251 "start_time": {"type": "date"},
214 "request_id": {"type": "keyword", "index": True},
252 "request_id": {"type": "keyword", "index": True},
215 "end_time": {"type": "date"},
253 "end_time": {"type": "date"},
216 "duration": {"type": "float"},
254 "duration": {"type": "float"},
217 "tags": {"type": "object"},
255 "tags": {"type": "object"},
218 "tag_list": {"type": "text", "analyzer": "tag_value",
256 "tag_list": {"type": "text", "analyzer": "tag_value",
219 "fields": {
257 "fields": {
220 "keyword": {
258 "keyword": {
221 "type": "keyword",
259 "type": "keyword",
222 "ignore_above": 256
260 "ignore_above": 256
223 }
261 }
224 }},
262 }},
225 "extra": {"type": "object"},
263 "extra": {"type": "object"},
226 },
264
227 "_parent": {"type": "report_group"},
265 # report stats
228 },
266
229 "log": {
267 "report_stat_id": {"type": "keyword", "index": True},
230 "_all": {"enabled": False},
231 "dynamic_templates": tag_templates,
232 "properties": {
233 "pg_id": {"type": "keyword", "index": True},
234 "delete_hash": {"type": "keyword", "index": True},
235 "resource_id": {"type": "integer"},
236 "timestamp": {"type": "date"},
268 "timestamp": {"type": "date"},
237 "permanent": {"type": "boolean"},
269 "permanent": {"type": "boolean"},
238 "request_id": {"type": "keyword", "index": True},
239 "log_level": {"type": "text", "analyzer": "simple"},
270 "log_level": {"type": "text", "analyzer": "simple"},
240 "message": {"type": "text", "analyzer": "simple"},
271 "message": {"type": "text", "analyzer": "simple"},
241 "namespace": {
272 "namespace": {
242 "type": "text",
273 "type": "text",
243 "fields": {"keyword": {"type": "keyword", "ignore_above": 256}},
274 "fields": {"keyword": {"type": "keyword", "ignore_above": 256}},
244 },
275 },
245 "tags": {"type": "object"},
276
246 "tag_list": {"type": "text", "analyzer": "tag_value",
277 "join_field": {
247 "fields": {
278 "type": "join",
248 "keyword": {
279 "relations": {
249 "type": "keyword",
280 "report_group": ["report", "report_stat"]
250 "ignore_above": 256
281 }
251 }
282 }
252 }},
283
253 },
284 },
285 }
286 }
287 }
288
289 Datastores.es.indices.put_template("rcae_reports", body=report_schema)
290
291 logs_mapping = copy.deepcopy(shared_log_mapping)
292 logs_mapping["properties"]["log_id"] = logs_mapping["properties"]["pg_id"]
293 del logs_mapping["properties"]["pg_id"]
294
295 log_template = {
296 "template": "rcae_l_*",
297 "settings": {
298 "index": {
299 "refresh_interval": "5s",
300 "translog": {"sync_interval": "5s", "durability": "async"},
301 "mapping": {"single_type": True}
254 },
302 },
303 "number_of_shards": 5,
304 "analysis": shared_analysis,
305 },
306 "mappings": {
307 "log": logs_mapping,
255 },
308 },
256 }
309 }
257
310
258 Datastores.es.indices.put_template("rcae", body=template_schema)
311 Datastores.es.indices.put_template("rcae_logs", body=log_template)
312
313 slow_call_mapping = copy.deepcopy(shared_log_mapping)
314 slow_call_mapping["properties"]["slow_call_id"] = slow_call_mapping["properties"]["pg_id"]
315 del slow_call_mapping["properties"]["pg_id"]
316
317 slow_call_template = {
318 "template": "rcae_sc_*",
319 "settings": {
320 "index": {
321 "refresh_interval": "5s",
322 "translog": {"sync_interval": "5s", "durability": "async"},
323 "mapping": {"single_type": True}
324 },
325 "number_of_shards": 5,
326 "analysis": shared_analysis,
327 },
328 "mappings": {
329 "log": slow_call_mapping,
330 },
331 }
332
333 Datastores.es.indices.put_template("rcae_slow_calls", body=slow_call_template)
334
335 metric_mapping = copy.deepcopy(shared_log_mapping)
336 metric_mapping["properties"]["metric_id"] = metric_mapping["properties"]["pg_id"]
337 del metric_mapping["properties"]["pg_id"]
338
339 metrics_template = {
340 "template": "rcae_m_*",
341 "settings": {
342 "index": {
343 "refresh_interval": "5s",
344 "translog": {"sync_interval": "5s", "durability": "async"},
345 "mapping": {"single_type": True}
346 },
347 "number_of_shards": 5,
348 "analysis": shared_analysis,
349 },
350 "mappings": {
351 "log": metric_mapping,
352 },
353 }
354
355 Datastores.es.indices.put_template("rcae_metrics", body=metrics_template)
356
357 uptime_metric_mapping = copy.deepcopy(shared_log_mapping)
358 uptime_metric_mapping["properties"]["uptime_id"] = uptime_metric_mapping["properties"]["pg_id"]
359 del uptime_metric_mapping["properties"]["pg_id"]
360
361 uptime_metrics_template = {
362 "template": "rcae_uptime_ce_*",
363 "settings": {
364 "index": {
365 "refresh_interval": "5s",
366 "translog": {"sync_interval": "5s", "durability": "async"},
367 "mapping": {"single_type": True}
368 },
369 "number_of_shards": 5,
370 "analysis": shared_analysis,
371 },
372 "mappings": {
373 "log": shared_log_mapping,
374 },
375 }
376
377 Datastores.es.indices.put_template("rcae_uptime_metrics", body=uptime_metrics_template)
259
378
260
379
261 def reindex_reports():
380 def reindex_reports():
262 reports_groups_tables = detect_tables("reports_groups_p_")
381 reports_groups_tables = detect_tables("reports_groups_p_")
263 try:
382 try:
264 Datastores.es.indices.delete("rcae_r*")
383 Datastores.es.indices.delete("`rcae_r_*")
265 except elasticsearch.exceptions.NotFoundError as e:
384 except elasticsearch.exceptions.NotFoundError as e:
266 log.error(e)
385 log.error(e)
267
386
268 log.info("reindexing report groups")
387 log.info("reindexing report groups")
269 i = 0
388 i = 0
270 task_start = datetime.datetime.now()
389 task_start = datetime.datetime.now()
271 for partition_table in reports_groups_tables:
390 for partition_table in reports_groups_tables:
272 conn = DBSession.connection().execution_options(stream_results=True)
391 conn = DBSession.connection().execution_options(stream_results=True)
273 result = conn.execute(partition_table.select())
392 result = conn.execute(partition_table.select())
274 while True:
393 while True:
275 chunk = result.fetchmany(2000)
394 chunk = result.fetchmany(2000)
276 if not chunk:
395 if not chunk:
277 break
396 break
278 es_docs = defaultdict(list)
397 es_docs = defaultdict(list)
279 for row in chunk:
398 for row in chunk:
280 i += 1
399 i += 1
281 item = ReportGroup(**dict(list(row.items())))
400 item = ReportGroup(**dict(list(row.items())))
282 d_range = item.partition_id
401 d_range = item.partition_id
283 es_docs[d_range].append(item.es_doc())
402 es_docs[d_range].append(item.es_doc())
284 if es_docs:
403 if es_docs:
285 name = partition_table.name
404 name = partition_table.name
286 log.info("round {}, {}".format(i, name))
405 log.info("round {}, {}".format(i, name))
287 for k, v in es_docs.items():
406 for k, v in es_docs.items():
288 to_update = {"_index": k, "_type": "report_group"}
407 to_update = {"_index": k, "_type": "report"}
289 [i.update(to_update) for i in v]
408 [i.update(to_update) for i in v]
290 elasticsearch.helpers.bulk(Datastores.es, v)
409 elasticsearch.helpers.bulk(Datastores.es, v)
291
410
292 log.info("total docs {} {}".format(i, datetime.datetime.now() - task_start))
411 log.info("total docs {} {}".format(i, datetime.datetime.now() - task_start))
293
412
294 i = 0
413 i = 0
295 log.info("reindexing reports")
414 log.info("reindexing reports")
296 task_start = datetime.datetime.now()
415 task_start = datetime.datetime.now()
297 reports_tables = detect_tables("reports_p_")
416 reports_tables = detect_tables("reports_p_")
298 for partition_table in reports_tables:
417 for partition_table in reports_tables:
299 conn = DBSession.connection().execution_options(stream_results=True)
418 conn = DBSession.connection().execution_options(stream_results=True)
300 result = conn.execute(partition_table.select())
419 result = conn.execute(partition_table.select())
301 while True:
420 while True:
302 chunk = result.fetchmany(2000)
421 chunk = result.fetchmany(2000)
303 if not chunk:
422 if not chunk:
304 break
423 break
305 es_docs = defaultdict(list)
424 es_docs = defaultdict(list)
306 for row in chunk:
425 for row in chunk:
307 i += 1
426 i += 1
308 item = Report(**dict(list(row.items())))
427 item = Report(**dict(list(row.items())))
309 d_range = item.partition_id
428 d_range = item.partition_id
310 es_docs[d_range].append(item.es_doc())
429 es_docs[d_range].append(item.es_doc())
311 if es_docs:
430 if es_docs:
312 name = partition_table.name
431 name = partition_table.name
313 log.info("round {}, {}".format(i, name))
432 log.info("round {}, {}".format(i, name))
314 for k, v in es_docs.items():
433 for k, v in es_docs.items():
315 to_update = {"_index": k, "_type": "report"}
434 to_update = {"_index": k, "_type": "report"}
316 [i.update(to_update) for i in v]
435 [i.update(to_update) for i in v]
317 elasticsearch.helpers.bulk(Datastores.es, v)
436 elasticsearch.helpers.bulk(Datastores.es, v)
318
437
319 log.info("total docs {} {}".format(i, datetime.datetime.now() - task_start))
438 log.info("total docs {} {}".format(i, datetime.datetime.now() - task_start))
320
439
321 log.info("reindexing reports stats")
440 log.info("reindexing reports stats")
322 i = 0
441 i = 0
323 task_start = datetime.datetime.now()
442 task_start = datetime.datetime.now()
324 reports_stats_tables = detect_tables("reports_stats_p_")
443 reports_stats_tables = detect_tables("reports_stats_p_")
325 for partition_table in reports_stats_tables:
444 for partition_table in reports_stats_tables:
326 conn = DBSession.connection().execution_options(stream_results=True)
445 conn = DBSession.connection().execution_options(stream_results=True)
327 result = conn.execute(partition_table.select())
446 result = conn.execute(partition_table.select())
328 while True:
447 while True:
329 chunk = result.fetchmany(2000)
448 chunk = result.fetchmany(2000)
330 if not chunk:
449 if not chunk:
331 break
450 break
332 es_docs = defaultdict(list)
451 es_docs = defaultdict(list)
333 for row in chunk:
452 for row in chunk:
334 rd = dict(list(row.items()))
453 rd = dict(list(row.items()))
335 # remove legacy columns
454 # remove legacy columns
336 # TODO: remove the column later
455 # TODO: remove the column later
337 rd.pop("size", None)
456 rd.pop("size", None)
338 item = ReportStat(**rd)
457 item = ReportStat(**rd)
339 i += 1
458 i += 1
340 d_range = item.partition_id
459 d_range = item.partition_id
341 es_docs[d_range].append(item.es_doc())
460 es_docs[d_range].append(item.es_doc())
342 if es_docs:
461 if es_docs:
343 name = partition_table.name
462 name = partition_table.name
344 log.info("round {}, {}".format(i, name))
463 log.info("round {}, {}".format(i, name))
345 for k, v in es_docs.items():
464 for k, v in es_docs.items():
346 to_update = {"_index": k, "_type": "log"}
465 to_update = {"_index": k, "_type": "report"}
347 [i.update(to_update) for i in v]
466 [i.update(to_update) for i in v]
348 elasticsearch.helpers.bulk(Datastores.es, v)
467 elasticsearch.helpers.bulk(Datastores.es, v)
349
468
350 log.info("total docs {} {}".format(i, datetime.datetime.now() - task_start))
469 log.info("total docs {} {}".format(i, datetime.datetime.now() - task_start))
351
470
352
471
353 def reindex_logs():
472 def reindex_logs():
354 try:
473 try:
355 Datastores.es.indices.delete("rcae_l*")
474 Datastores.es.indices.delete("rcae_l_*")
356 except elasticsearch.exceptions.NotFoundError as e:
475 except elasticsearch.exceptions.NotFoundError as e:
357 log.error(e)
476 log.error(e)
358
477
359 # logs
478 # logs
360 log.info("reindexing logs")
479 log.info("reindexing logs")
361 i = 0
480 i = 0
362 task_start = datetime.datetime.now()
481 task_start = datetime.datetime.now()
363 log_tables = detect_tables("logs_p_")
482 log_tables = detect_tables("logs_p_")
364 for partition_table in log_tables:
483 for partition_table in log_tables:
365 conn = DBSession.connection().execution_options(stream_results=True)
484 conn = DBSession.connection().execution_options(stream_results=True)
366 result = conn.execute(partition_table.select())
485 result = conn.execute(partition_table.select())
367 while True:
486 while True:
368 chunk = result.fetchmany(2000)
487 chunk = result.fetchmany(2000)
369 if not chunk:
488 if not chunk:
370 break
489 break
371 es_docs = defaultdict(list)
490 es_docs = defaultdict(list)
372
491
373 for row in chunk:
492 for row in chunk:
374 i += 1
493 i += 1
375 item = Log(**dict(list(row.items())))
494 item = Log(**dict(list(row.items())))
376 d_range = item.partition_id
495 d_range = item.partition_id
377 es_docs[d_range].append(item.es_doc())
496 es_docs[d_range].append(item.es_doc())
378 if es_docs:
497 if es_docs:
379 name = partition_table.name
498 name = partition_table.name
380 log.info("round {}, {}".format(i, name))
499 log.info("round {}, {}".format(i, name))
381 for k, v in es_docs.items():
500 for k, v in es_docs.items():
382 to_update = {"_index": k, "_type": "log"}
501 to_update = {"_index": k, "_type": "log"}
383 [i.update(to_update) for i in v]
502 [i.update(to_update) for i in v]
384 elasticsearch.helpers.bulk(Datastores.es, v)
503 elasticsearch.helpers.bulk(Datastores.es, v)
385
504
386 log.info("total docs {} {}".format(i, datetime.datetime.now() - task_start))
505 log.info("total docs {} {}".format(i, datetime.datetime.now() - task_start))
387
506
388
507
389 def reindex_metrics():
508 def reindex_metrics():
390 try:
509 try:
391 Datastores.es.indices.delete("rcae_m*")
510 Datastores.es.indices.delete("rcae_m_*")
392 except elasticsearch.exceptions.NotFoundError as e:
511 except elasticsearch.exceptions.NotFoundError as e:
393 log.error(e)
512 log.error(e)
394
513
395 log.info("reindexing applications metrics")
514 log.info("reindexing applications metrics")
396 i = 0
515 i = 0
397 task_start = datetime.datetime.now()
516 task_start = datetime.datetime.now()
398 metric_tables = detect_tables("metrics_p_")
517 metric_tables = detect_tables("metrics_p_")
399 for partition_table in metric_tables:
518 for partition_table in metric_tables:
400 conn = DBSession.connection().execution_options(stream_results=True)
519 conn = DBSession.connection().execution_options(stream_results=True)
401 result = conn.execute(partition_table.select())
520 result = conn.execute(partition_table.select())
402 while True:
521 while True:
403 chunk = result.fetchmany(2000)
522 chunk = result.fetchmany(2000)
404 if not chunk:
523 if not chunk:
405 break
524 break
406 es_docs = defaultdict(list)
525 es_docs = defaultdict(list)
407 for row in chunk:
526 for row in chunk:
408 i += 1
527 i += 1
409 item = Metric(**dict(list(row.items())))
528 item = Metric(**dict(list(row.items())))
410 d_range = item.partition_id
529 d_range = item.partition_id
411 es_docs[d_range].append(item.es_doc())
530 es_docs[d_range].append(item.es_doc())
412 if es_docs:
531 if es_docs:
413 name = partition_table.name
532 name = partition_table.name
414 log.info("round {}, {}".format(i, name))
533 log.info("round {}, {}".format(i, name))
415 for k, v in es_docs.items():
534 for k, v in es_docs.items():
416 to_update = {"_index": k, "_type": "log"}
535 to_update = {"_index": k, "_type": "log"}
417 [i.update(to_update) for i in v]
536 [i.update(to_update) for i in v]
418 elasticsearch.helpers.bulk(Datastores.es, v)
537 elasticsearch.helpers.bulk(Datastores.es, v)
419
538
420 log.info("total docs {} {}".format(i, datetime.datetime.now() - task_start))
539 log.info("total docs {} {}".format(i, datetime.datetime.now() - task_start))
421
540
422
541
423 def reindex_slow_calls():
542 def reindex_slow_calls():
424 try:
543 try:
425 Datastores.es.indices.delete("rcae_sc*")
544 Datastores.es.indices.delete("rcae_sc_*")
426 except elasticsearch.exceptions.NotFoundError as e:
545 except elasticsearch.exceptions.NotFoundError as e:
427 log.error(e)
546 log.error(e)
428
547
429 log.info("reindexing slow calls")
548 log.info("reindexing slow calls")
430 i = 0
549 i = 0
431 task_start = datetime.datetime.now()
550 task_start = datetime.datetime.now()
432 slow_calls_tables = detect_tables("slow_calls_p_")
551 slow_calls_tables = detect_tables("slow_calls_p_")
433 for partition_table in slow_calls_tables:
552 for partition_table in slow_calls_tables:
434 conn = DBSession.connection().execution_options(stream_results=True)
553 conn = DBSession.connection().execution_options(stream_results=True)
435 result = conn.execute(partition_table.select())
554 result = conn.execute(partition_table.select())
436 while True:
555 while True:
437 chunk = result.fetchmany(2000)
556 chunk = result.fetchmany(2000)
438 if not chunk:
557 if not chunk:
439 break
558 break
440 es_docs = defaultdict(list)
559 es_docs = defaultdict(list)
441 for row in chunk:
560 for row in chunk:
442 i += 1
561 i += 1
443 item = SlowCall(**dict(list(row.items())))
562 item = SlowCall(**dict(list(row.items())))
444 d_range = item.partition_id
563 d_range = item.partition_id
445 es_docs[d_range].append(item.es_doc())
564 es_docs[d_range].append(item.es_doc())
446 if es_docs:
565 if es_docs:
447 name = partition_table.name
566 name = partition_table.name
448 log.info("round {}, {}".format(i, name))
567 log.info("round {}, {}".format(i, name))
449 for k, v in es_docs.items():
568 for k, v in es_docs.items():
450 to_update = {"_index": k, "_type": "log"}
569 to_update = {"_index": k, "_type": "log"}
451 [i.update(to_update) for i in v]
570 [i.update(to_update) for i in v]
452 elasticsearch.helpers.bulk(Datastores.es, v)
571 elasticsearch.helpers.bulk(Datastores.es, v)
453
572
454 log.info("total docs {} {}".format(i, datetime.datetime.now() - task_start))
573 log.info("total docs {} {}".format(i, datetime.datetime.now() - task_start))
455
574
456
575
457 if __name__ == "__main__":
576 if __name__ == "__main__":
458 main()
577 main()
General Comments 4
Under Review
author

Auto status change to "Under Review"

Under Review
author

Auto status change to "Under Review"

You need to be logged in to leave comments. Login now