##// END OF EJS Templates
normalize bool.filter format for elasticsearch 5.x
ergo -
Show More
@@ -1,708 +1,708 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors
4 4 #
5 5 # Licensed under the Apache License, Version 2.0 (the "License");
6 6 # you may not use this file except in compliance with the License.
7 7 # You may obtain a copy of the License at
8 8 #
9 9 # http://www.apache.org/licenses/LICENSE-2.0
10 10 #
11 11 # Unless required by applicable law or agreed to in writing, software
12 12 # distributed under the License is distributed on an "AS IS" BASIS,
13 13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 14 # See the License for the specific language governing permissions and
15 15 # limitations under the License.
16 16
17 17 import bisect
18 18 import collections
19 19 import math
20 20 from datetime import datetime, timedelta
21 21
22 22 import sqlalchemy as sa
23 23 import elasticsearch.exceptions
24 24 import elasticsearch.helpers
25 25
26 26 from celery.utils.log import get_task_logger
27 27 from zope.sqlalchemy import mark_changed
28 28 from pyramid.threadlocal import get_current_request, get_current_registry
29 29 from ziggurat_foundations.models.services.resource import ResourceService
30 30
31 31 from appenlight.celery import celery
32 32 from appenlight.models.report_group import ReportGroup
33 33 from appenlight.models import DBSession, Datastores
34 34 from appenlight.models.report import Report
35 35 from appenlight.models.log import Log
36 36 from appenlight.models.metric import Metric
37 37 from appenlight.models.event import Event
38 38
39 39 from appenlight.models.services.application import ApplicationService
40 40 from appenlight.models.services.event import EventService
41 41 from appenlight.models.services.log import LogService
42 42 from appenlight.models.services.report import ReportService
43 43 from appenlight.models.services.report_group import ReportGroupService
44 44 from appenlight.models.services.user import UserService
45 45 from appenlight.models.tag import Tag
46 46 from appenlight.lib import print_traceback
47 47 from appenlight.lib.utils import parse_proto, in_batches
48 48 from appenlight.lib.ext_json import json
49 49 from appenlight.lib.redis_keys import REDIS_KEYS
50 50 from appenlight.lib.enums import ReportType
51 51
52 52 log = get_task_logger(__name__)
53 53
54 54 sample_boundries = (
55 55 list(range(100, 1000, 100))
56 56 + list(range(1000, 10000, 1000))
57 57 + list(range(10000, 100000, 5000))
58 58 )
59 59
60 60
61 61 def pick_sample(total_occurences, report_type=None):
62 62 every = 1.0
63 63 position = bisect.bisect_left(sample_boundries, total_occurences)
64 64 if position > 0:
65 65 if report_type == ReportType.not_found:
66 66 divide = 10.0
67 67 else:
68 68 divide = 100.0
69 69 every = sample_boundries[position - 1] / divide
70 70 return total_occurences % every == 0
71 71
72 72
73 73 @celery.task(queue="default", default_retry_delay=1, max_retries=2)
74 74 def test_exception_task():
75 75 log.error("test celery log", extra={"location": "celery"})
76 76 log.warning("test celery log", extra={"location": "celery"})
77 77 raise Exception("Celery exception test")
78 78
79 79
80 80 @celery.task(queue="default", default_retry_delay=1, max_retries=2)
81 81 def test_retry_exception_task():
82 82 try:
83 83 import time
84 84
85 85 time.sleep(1.3)
86 86 log.error("test retry celery log", extra={"location": "celery"})
87 87 log.warning("test retry celery log", extra={"location": "celery"})
88 88 raise Exception("Celery exception test")
89 89 except Exception as exc:
90 90 if celery.conf["CELERY_EAGER_PROPAGATES_EXCEPTIONS"]:
91 91 raise
92 92 test_retry_exception_task.retry(exc=exc)
93 93
94 94
95 95 @celery.task(queue="reports", default_retry_delay=600, max_retries=144)
96 96 def add_reports(resource_id, request_params, dataset, **kwargs):
97 97 proto_version = parse_proto(request_params.get("protocol_version", ""))
98 98 current_time = datetime.utcnow().replace(second=0, microsecond=0)
99 99 try:
100 100 # we will store solr docs here for single insert
101 101 es_report_docs = {}
102 102 es_report_group_docs = {}
103 103 resource = ApplicationService.by_id(resource_id)
104 104
105 105 tags = []
106 106 es_slow_calls_docs = {}
107 107 es_reports_stats_rows = {}
108 108 for report_data in dataset:
109 109 # build report details for later
110 110 added_details = 0
111 111 report = Report()
112 112 report.set_data(report_data, resource, proto_version)
113 113 report._skip_ft_index = True
114 114
115 115 # find latest group in this months partition
116 116 report_group = ReportGroupService.by_hash_and_resource(
117 117 report.resource_id,
118 118 report.grouping_hash,
119 119 since_when=datetime.utcnow().date().replace(day=1),
120 120 )
121 121 occurences = report_data.get("occurences", 1)
122 122 if not report_group:
123 123 # total reports will be +1 moment later
124 124 report_group = ReportGroup(
125 125 grouping_hash=report.grouping_hash,
126 126 occurences=0,
127 127 total_reports=0,
128 128 last_report=0,
129 129 priority=report.priority,
130 130 error=report.error,
131 131 first_timestamp=report.start_time,
132 132 )
133 133 report_group._skip_ft_index = True
134 134 report_group.report_type = report.report_type
135 135 report.report_group_time = report_group.first_timestamp
136 136 add_sample = pick_sample(
137 137 report_group.occurences, report_type=report_group.report_type
138 138 )
139 139 if add_sample:
140 140 resource.report_groups.append(report_group)
141 141 report_group.reports.append(report)
142 142 added_details += 1
143 143 DBSession.flush()
144 144 if report.partition_id not in es_report_docs:
145 145 es_report_docs[report.partition_id] = []
146 146 es_report_docs[report.partition_id].append(report.es_doc())
147 147 tags.extend(list(report.tags.items()))
148 148 slow_calls = report.add_slow_calls(report_data, report_group)
149 149 DBSession.flush()
150 150 for s_call in slow_calls:
151 151 if s_call.partition_id not in es_slow_calls_docs:
152 152 es_slow_calls_docs[s_call.partition_id] = []
153 153 es_slow_calls_docs[s_call.partition_id].append(s_call.es_doc())
154 154 # try generating new stat rows if needed
155 155 else:
156 156 # required for postprocessing to not fail later
157 157 report.report_group = report_group
158 158
159 159 stat_row = ReportService.generate_stat_rows(report, resource, report_group)
160 160 if stat_row.partition_id not in es_reports_stats_rows:
161 161 es_reports_stats_rows[stat_row.partition_id] = []
162 162 es_reports_stats_rows[stat_row.partition_id].append(stat_row.es_doc())
163 163
164 164 # see if we should mark 10th occurence of report
165 165 last_occurences_10 = int(math.floor(report_group.occurences / 10))
166 166 curr_occurences_10 = int(
167 167 math.floor((report_group.occurences + report.occurences) / 10)
168 168 )
169 169 last_occurences_100 = int(math.floor(report_group.occurences / 100))
170 170 curr_occurences_100 = int(
171 171 math.floor((report_group.occurences + report.occurences) / 100)
172 172 )
173 173 notify_occurences_10 = last_occurences_10 != curr_occurences_10
174 174 notify_occurences_100 = last_occurences_100 != curr_occurences_100
175 175 report_group.occurences = ReportGroup.occurences + occurences
176 176 report_group.last_timestamp = report.start_time
177 177 report_group.summed_duration = ReportGroup.summed_duration + report.duration
178 178 summed_duration = ReportGroup.summed_duration + report.duration
179 179 summed_occurences = ReportGroup.occurences + occurences
180 180 report_group.average_duration = summed_duration / summed_occurences
181 181 report_group.run_postprocessing(report)
182 182 if added_details:
183 183 report_group.total_reports = ReportGroup.total_reports + 1
184 184 report_group.last_report = report.id
185 185 report_group.set_notification_info(
186 186 notify_10=notify_occurences_10, notify_100=notify_occurences_100
187 187 )
188 188 DBSession.flush()
189 189 report_group.get_report().notify_channel(report_group)
190 190 if report_group.partition_id not in es_report_group_docs:
191 191 es_report_group_docs[report_group.partition_id] = []
192 192 es_report_group_docs[report_group.partition_id].append(
193 193 report_group.es_doc()
194 194 )
195 195
196 196 action = "REPORT"
197 197 log_msg = "%s: %s %s, client: %s, proto: %s" % (
198 198 action,
199 199 report_data.get("http_status", "unknown"),
200 200 str(resource),
201 201 report_data.get("client"),
202 202 proto_version,
203 203 )
204 204 log.info(log_msg)
205 205 total_reports = len(dataset)
206 206 redis_pipeline = Datastores.redis.pipeline(transaction=False)
207 207 key = REDIS_KEYS["counters"]["reports_per_minute"].format(current_time)
208 208 redis_pipeline.incr(key, total_reports)
209 209 redis_pipeline.expire(key, 3600 * 24)
210 210 key = REDIS_KEYS["counters"]["events_per_minute_per_user"].format(
211 211 resource.owner_user_id, current_time
212 212 )
213 213 redis_pipeline.incr(key, total_reports)
214 214 redis_pipeline.expire(key, 3600)
215 215 key = REDIS_KEYS["counters"]["reports_per_hour_per_app"].format(
216 216 resource_id, current_time.replace(minute=0)
217 217 )
218 218 redis_pipeline.incr(key, total_reports)
219 219 redis_pipeline.expire(key, 3600 * 24 * 7)
220 220 redis_pipeline.sadd(
221 221 REDIS_KEYS["apps_that_got_new_data_per_hour"].format(
222 222 current_time.replace(minute=0)
223 223 ),
224 224 resource_id,
225 225 )
226 226 redis_pipeline.execute()
227 227
228 228 add_reports_es(es_report_group_docs, es_report_docs)
229 229 add_reports_slow_calls_es(es_slow_calls_docs)
230 230 add_reports_stats_rows_es(es_reports_stats_rows)
231 231 return True
232 232 except Exception as exc:
233 233 print_traceback(log)
234 234 if celery.conf["CELERY_EAGER_PROPAGATES_EXCEPTIONS"]:
235 235 raise
236 236 add_reports.retry(exc=exc)
237 237
238 238
239 239 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
240 240 def add_reports_es(report_group_docs, report_docs):
241 241 for k, v in report_group_docs.items():
242 242 to_update = {"_index": k, "_type": "report_group"}
243 243 [i.update(to_update) for i in v]
244 244 elasticsearch.helpers.bulk(Datastores.es, v)
245 245 for k, v in report_docs.items():
246 246 to_update = {"_index": k, "_type": "report"}
247 247 [i.update(to_update) for i in v]
248 248 elasticsearch.helpers.bulk(Datastores.es, v)
249 249
250 250
251 251 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
252 252 def add_reports_slow_calls_es(es_docs):
253 253 for k, v in es_docs.items():
254 254 to_update = {"_index": k, "_type": "log"}
255 255 [i.update(to_update) for i in v]
256 256 elasticsearch.helpers.bulk(Datastores.es, v)
257 257
258 258
259 259 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
260 260 def add_reports_stats_rows_es(es_docs):
261 261 for k, v in es_docs.items():
262 262 to_update = {"_index": k, "_type": "log"}
263 263 [i.update(to_update) for i in v]
264 264 elasticsearch.helpers.bulk(Datastores.es, v)
265 265
266 266
267 267 @celery.task(queue="logs", default_retry_delay=600, max_retries=144)
268 268 def add_logs(resource_id, request_params, dataset, **kwargs):
269 269 proto_version = request_params.get("protocol_version")
270 270 current_time = datetime.utcnow().replace(second=0, microsecond=0)
271 271
272 272 try:
273 273 es_docs = collections.defaultdict(list)
274 274 resource = ApplicationService.by_id_cached()(resource_id)
275 275 resource = DBSession.merge(resource, load=False)
276 276 ns_pairs = []
277 277 for entry in dataset:
278 278 # gather pk and ns so we can remove older versions of row later
279 279 if entry["primary_key"] is not None:
280 280 ns_pairs.append({"pk": entry["primary_key"], "ns": entry["namespace"]})
281 281 log_entry = Log()
282 282 log_entry.set_data(entry, resource=resource)
283 283 log_entry._skip_ft_index = True
284 284 resource.logs.append(log_entry)
285 285 DBSession.flush()
286 286 # insert non pk rows first
287 287 if entry["primary_key"] is None:
288 288 es_docs[log_entry.partition_id].append(log_entry.es_doc())
289 289
290 290 # 2nd pass to delete all log entries from db foe same pk/ns pair
291 291 if ns_pairs:
292 292 ids_to_delete = []
293 293 es_docs = collections.defaultdict(list)
294 294 es_docs_to_delete = collections.defaultdict(list)
295 295 found_pkey_logs = LogService.query_by_primary_key_and_namespace(
296 296 list_of_pairs=ns_pairs
297 297 )
298 298 log_dict = {}
299 299 for log_entry in found_pkey_logs:
300 300 log_key = (log_entry.primary_key, log_entry.namespace)
301 301 if log_key not in log_dict:
302 302 log_dict[log_key] = []
303 303 log_dict[log_key].append(log_entry)
304 304
305 305 for ns, entry_list in log_dict.items():
306 306 entry_list = sorted(entry_list, key=lambda x: x.timestamp)
307 307 # newest row needs to be indexed in es
308 308 log_entry = entry_list[-1]
309 309 # delete everything from pg and ES, leave the last row in pg
310 310 for e in entry_list[:-1]:
311 311 ids_to_delete.append(e.log_id)
312 312 es_docs_to_delete[e.partition_id].append(e.delete_hash)
313 313
314 314 es_docs_to_delete[log_entry.partition_id].append(log_entry.delete_hash)
315 315
316 316 es_docs[log_entry.partition_id].append(log_entry.es_doc())
317 317
318 318 if ids_to_delete:
319 319 query = DBSession.query(Log).filter(Log.log_id.in_(ids_to_delete))
320 320 query.delete(synchronize_session=False)
321 321 if es_docs_to_delete:
322 322 # batch this to avoid problems with default ES bulk limits
323 323 for es_index in es_docs_to_delete.keys():
324 324 for batch in in_batches(es_docs_to_delete[es_index], 20):
325 325 query = {"query": {"terms": {"delete_hash": batch}}}
326 326
327 327 try:
328 328 Datastores.es.transport.perform_request(
329 329 "DELETE",
330 330 "/{}/{}/_query".format(es_index, "log"),
331 331 body=query,
332 332 )
333 333 except elasticsearch.exceptions.NotFoundError as exc:
334 334 msg = "skipping index {}".format(es_index)
335 335 log.info(msg)
336 336
337 337 total_logs = len(dataset)
338 338
339 339 log_msg = "LOG_NEW: %s, entries: %s, proto:%s" % (
340 340 str(resource),
341 341 total_logs,
342 342 proto_version,
343 343 )
344 344 log.info(log_msg)
345 345 # mark_changed(session)
346 346 redis_pipeline = Datastores.redis.pipeline(transaction=False)
347 347 key = REDIS_KEYS["counters"]["logs_per_minute"].format(current_time)
348 348 redis_pipeline.incr(key, total_logs)
349 349 redis_pipeline.expire(key, 3600 * 24)
350 350 key = REDIS_KEYS["counters"]["events_per_minute_per_user"].format(
351 351 resource.owner_user_id, current_time
352 352 )
353 353 redis_pipeline.incr(key, total_logs)
354 354 redis_pipeline.expire(key, 3600)
355 355 key = REDIS_KEYS["counters"]["logs_per_hour_per_app"].format(
356 356 resource_id, current_time.replace(minute=0)
357 357 )
358 358 redis_pipeline.incr(key, total_logs)
359 359 redis_pipeline.expire(key, 3600 * 24 * 7)
360 360 redis_pipeline.sadd(
361 361 REDIS_KEYS["apps_that_got_new_data_per_hour"].format(
362 362 current_time.replace(minute=0)
363 363 ),
364 364 resource_id,
365 365 )
366 366 redis_pipeline.execute()
367 367 add_logs_es(es_docs)
368 368 return True
369 369 except Exception as exc:
370 370 print_traceback(log)
371 371 if celery.conf["CELERY_EAGER_PROPAGATES_EXCEPTIONS"]:
372 372 raise
373 373 add_logs.retry(exc=exc)
374 374
375 375
376 376 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
377 377 def add_logs_es(es_docs):
378 378 for k, v in es_docs.items():
379 379 to_update = {"_index": k, "_type": "log"}
380 380 [i.update(to_update) for i in v]
381 381 elasticsearch.helpers.bulk(Datastores.es, v)
382 382
383 383
384 384 @celery.task(queue="metrics", default_retry_delay=600, max_retries=144)
385 385 def add_metrics(resource_id, request_params, dataset, proto_version):
386 386 current_time = datetime.utcnow().replace(second=0, microsecond=0)
387 387 try:
388 388 resource = ApplicationService.by_id_cached()(resource_id)
389 389 resource = DBSession.merge(resource, load=False)
390 390 es_docs = []
391 391 rows = []
392 392 for metric in dataset:
393 393 tags = dict(metric["tags"])
394 394 server_n = tags.get("server_name", metric["server_name"]).lower()
395 395 tags["server_name"] = server_n or "unknown"
396 396 new_metric = Metric(
397 397 timestamp=metric["timestamp"],
398 398 resource_id=resource.resource_id,
399 399 namespace=metric["namespace"],
400 400 tags=tags,
401 401 )
402 402 rows.append(new_metric)
403 403 es_docs.append(new_metric.es_doc())
404 404 session = DBSession()
405 405 session.bulk_save_objects(rows)
406 406 session.flush()
407 407
408 408 action = "METRICS"
409 409 metrics_msg = "%s: %s, metrics: %s, proto:%s" % (
410 410 action,
411 411 str(resource),
412 412 len(dataset),
413 413 proto_version,
414 414 )
415 415 log.info(metrics_msg)
416 416
417 417 mark_changed(session)
418 418 redis_pipeline = Datastores.redis.pipeline(transaction=False)
419 419 key = REDIS_KEYS["counters"]["metrics_per_minute"].format(current_time)
420 420 redis_pipeline.incr(key, len(rows))
421 421 redis_pipeline.expire(key, 3600 * 24)
422 422 key = REDIS_KEYS["counters"]["events_per_minute_per_user"].format(
423 423 resource.owner_user_id, current_time
424 424 )
425 425 redis_pipeline.incr(key, len(rows))
426 426 redis_pipeline.expire(key, 3600)
427 427 key = REDIS_KEYS["counters"]["metrics_per_hour_per_app"].format(
428 428 resource_id, current_time.replace(minute=0)
429 429 )
430 430 redis_pipeline.incr(key, len(rows))
431 431 redis_pipeline.expire(key, 3600 * 24 * 7)
432 432 redis_pipeline.sadd(
433 433 REDIS_KEYS["apps_that_got_new_data_per_hour"].format(
434 434 current_time.replace(minute=0)
435 435 ),
436 436 resource_id,
437 437 )
438 438 redis_pipeline.execute()
439 439 add_metrics_es(es_docs)
440 440 return True
441 441 except Exception as exc:
442 442 print_traceback(log)
443 443 if celery.conf["CELERY_EAGER_PROPAGATES_EXCEPTIONS"]:
444 444 raise
445 445 add_metrics.retry(exc=exc)
446 446
447 447
448 448 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
449 449 def add_metrics_es(es_docs):
450 450 for doc in es_docs:
451 451 partition = "rcae_m_%s" % doc["timestamp"].strftime("%Y_%m_%d")
452 452 Datastores.es.index(partition, "log", doc)
453 453
454 454
455 455 @celery.task(queue="default", default_retry_delay=5, max_retries=2)
456 456 def check_user_report_notifications(resource_id):
457 457 since_when = datetime.utcnow()
458 458 try:
459 459 request = get_current_request()
460 460 application = ApplicationService.by_id(resource_id)
461 461 if not application:
462 462 return
463 463 error_key = REDIS_KEYS["reports_to_notify_per_type_per_app"].format(
464 464 ReportType.error, resource_id
465 465 )
466 466 slow_key = REDIS_KEYS["reports_to_notify_per_type_per_app"].format(
467 467 ReportType.slow, resource_id
468 468 )
469 469 error_group_ids = Datastores.redis.smembers(error_key)
470 470 slow_group_ids = Datastores.redis.smembers(slow_key)
471 471 Datastores.redis.delete(error_key)
472 472 Datastores.redis.delete(slow_key)
473 473 err_gids = [int(g_id) for g_id in error_group_ids]
474 474 slow_gids = [int(g_id) for g_id in list(slow_group_ids)]
475 475 group_ids = err_gids + slow_gids
476 476 occurence_dict = {}
477 477 for g_id in group_ids:
478 478 key = REDIS_KEYS["counters"]["report_group_occurences"].format(g_id)
479 479 val = Datastores.redis.get(key)
480 480 Datastores.redis.delete(key)
481 481 if val:
482 482 occurence_dict[g_id] = int(val)
483 483 else:
484 484 occurence_dict[g_id] = 1
485 485 report_groups = ReportGroupService.by_ids(group_ids)
486 486 report_groups.options(sa.orm.joinedload(ReportGroup.last_report_ref))
487 487
488 488 ApplicationService.check_for_groups_alert(
489 489 application,
490 490 "alert",
491 491 report_groups=report_groups,
492 492 occurence_dict=occurence_dict,
493 493 )
494 494 users = set(
495 495 [p.user for p in ResourceService.users_for_perm(application, "view")]
496 496 )
497 497 report_groups = report_groups.all()
498 498 for user in users:
499 499 UserService.report_notify(
500 500 user,
501 501 request,
502 502 application,
503 503 report_groups=report_groups,
504 504 occurence_dict=occurence_dict,
505 505 )
506 506 for group in report_groups:
507 507 # marks report_groups as notified
508 508 if not group.notified:
509 509 group.notified = True
510 510 except Exception as exc:
511 511 print_traceback(log)
512 512 raise
513 513
514 514
515 515 @celery.task(queue="default", default_retry_delay=5, max_retries=2)
516 516 def check_alerts(resource_id):
517 517 since_when = datetime.utcnow()
518 518 try:
519 519 request = get_current_request()
520 520 application = ApplicationService.by_id(resource_id)
521 521 if not application:
522 522 return
523 523 error_key = REDIS_KEYS["reports_to_notify_per_type_per_app_alerting"].format(
524 524 ReportType.error, resource_id
525 525 )
526 526 slow_key = REDIS_KEYS["reports_to_notify_per_type_per_app_alerting"].format(
527 527 ReportType.slow, resource_id
528 528 )
529 529 error_group_ids = Datastores.redis.smembers(error_key)
530 530 slow_group_ids = Datastores.redis.smembers(slow_key)
531 531 Datastores.redis.delete(error_key)
532 532 Datastores.redis.delete(slow_key)
533 533 err_gids = [int(g_id) for g_id in error_group_ids]
534 534 slow_gids = [int(g_id) for g_id in list(slow_group_ids)]
535 535 group_ids = err_gids + slow_gids
536 536 occurence_dict = {}
537 537 for g_id in group_ids:
538 538 key = REDIS_KEYS["counters"]["report_group_occurences_alerting"].format(
539 539 g_id
540 540 )
541 541 val = Datastores.redis.get(key)
542 542 Datastores.redis.delete(key)
543 543 if val:
544 544 occurence_dict[g_id] = int(val)
545 545 else:
546 546 occurence_dict[g_id] = 1
547 547 report_groups = ReportGroupService.by_ids(group_ids)
548 548 report_groups.options(sa.orm.joinedload(ReportGroup.last_report_ref))
549 549
550 550 ApplicationService.check_for_groups_alert(
551 551 application,
552 552 "alert",
553 553 report_groups=report_groups,
554 554 occurence_dict=occurence_dict,
555 555 since_when=since_when,
556 556 )
557 557 except Exception as exc:
558 558 print_traceback(log)
559 559 raise
560 560
561 561
562 562 @celery.task(queue="default", default_retry_delay=1, max_retries=2)
563 563 def close_alerts():
564 564 log.warning("Checking alerts")
565 565 since_when = datetime.utcnow()
566 566 try:
567 567 event_types = [
568 568 Event.types["error_report_alert"],
569 569 Event.types["slow_report_alert"],
570 570 ]
571 571 statuses = [Event.statuses["active"]]
572 572 # get events older than 5 min
573 573 events = EventService.by_type_and_status(
574 574 event_types, statuses, older_than=(since_when - timedelta(minutes=5))
575 575 )
576 576 for event in events:
577 577 # see if we can close them
578 578 event.validate_or_close(since_when=(since_when - timedelta(minutes=1)))
579 579 except Exception as exc:
580 580 print_traceback(log)
581 581 raise
582 582
583 583
584 584 @celery.task(queue="default", default_retry_delay=600, max_retries=144)
585 585 def update_tag_counter(tag_name, tag_value, count):
586 586 try:
587 587 query = (
588 588 DBSession.query(Tag)
589 589 .filter(Tag.name == tag_name)
590 590 .filter(
591 591 sa.cast(Tag.value, sa.types.TEXT)
592 592 == sa.cast(json.dumps(tag_value), sa.types.TEXT)
593 593 )
594 594 )
595 595 query.update(
596 596 {"times_seen": Tag.times_seen + count, "last_timestamp": datetime.utcnow()},
597 597 synchronize_session=False,
598 598 )
599 599 session = DBSession()
600 600 mark_changed(session)
601 601 return True
602 602 except Exception as exc:
603 603 print_traceback(log)
604 604 if celery.conf["CELERY_EAGER_PROPAGATES_EXCEPTIONS"]:
605 605 raise
606 606 update_tag_counter.retry(exc=exc)
607 607
608 608
609 609 @celery.task(queue="default")
610 610 def update_tag_counters():
611 611 """
612 612 Sets task to update counters for application tags
613 613 """
614 614 tags = Datastores.redis.lrange(REDIS_KEYS["seen_tag_list"], 0, -1)
615 615 Datastores.redis.delete(REDIS_KEYS["seen_tag_list"])
616 616 c = collections.Counter(tags)
617 617 for t_json, count in c.items():
618 618 tag_info = json.loads(t_json)
619 619 update_tag_counter.delay(tag_info[0], tag_info[1], count)
620 620
621 621
622 622 @celery.task(queue="default")
623 623 def daily_digest():
624 624 """
625 625 Sends daily digest with top 50 error reports
626 626 """
627 627 request = get_current_request()
628 628 apps = Datastores.redis.smembers(REDIS_KEYS["apps_that_had_reports"])
629 629 Datastores.redis.delete(REDIS_KEYS["apps_that_had_reports"])
630 630 since_when = datetime.utcnow() - timedelta(hours=8)
631 631 log.warning("Generating daily digests")
632 632 for resource_id in apps:
633 633 resource_id = resource_id.decode("utf8")
634 634 end_date = datetime.utcnow().replace(microsecond=0, second=0)
635 635 filter_settings = {
636 636 "resource": [resource_id],
637 637 "tags": [{"name": "type", "value": ["error"], "op": None}],
638 638 "type": "error",
639 639 "start_date": since_when,
640 640 "end_date": end_date,
641 641 }
642 642
643 643 reports = ReportGroupService.get_trending(
644 644 request, filter_settings=filter_settings, limit=50
645 645 )
646 646
647 647 application = ApplicationService.by_id(resource_id)
648 648 if application:
649 649 users = set(
650 650 [p.user for p in ResourceService.users_for_perm(application, "view")]
651 651 )
652 652 for user in users:
653 653 user.send_digest(
654 654 request, application, reports=reports, since_when=since_when
655 655 )
656 656
657 657
658 658 @celery.task(queue="default")
659 659 def notifications_reports():
660 660 """
661 661 Loop that checks redis for info and then issues new tasks to celery to
662 662 issue notifications
663 663 """
664 664 apps = Datastores.redis.smembers(REDIS_KEYS["apps_that_had_reports"])
665 665 Datastores.redis.delete(REDIS_KEYS["apps_that_had_reports"])
666 666 for app in apps:
667 667 log.warning("Notify for app: %s" % app)
668 668 check_user_report_notifications.delay(app.decode("utf8"))
669 669
670 670
671 671 @celery.task(queue="default")
672 672 def alerting_reports():
673 673 """
674 674 Loop that checks redis for info and then issues new tasks to celery to
675 675 perform the following:
676 676 - which applications should have new alerts opened
677 677 """
678 678
679 679 apps = Datastores.redis.smembers(REDIS_KEYS["apps_that_had_reports_alerting"])
680 680 Datastores.redis.delete(REDIS_KEYS["apps_that_had_reports_alerting"])
681 681 for app in apps:
682 682 log.warning("Notify for app: %s" % app)
683 683 check_alerts.delay(app.decode("utf8"))
684 684
685 685
686 686 @celery.task(
687 687 queue="default", soft_time_limit=3600 * 4, hard_time_limit=3600 * 4, max_retries=144
688 688 )
689 689 def logs_cleanup(resource_id, filter_settings):
690 690 request = get_current_request()
691 691 request.tm.begin()
692 692 es_query = {
693 693 "query": {
694 "bool": {"filter": {"and": [{"term": {"resource_id": resource_id}}]}}
694 "bool": {"filter": [{"term": {"resource_id": resource_id}}]}
695 695 }
696 696 }
697 697
698 698 query = DBSession.query(Log).filter(Log.resource_id == resource_id)
699 699 if filter_settings["namespace"]:
700 700 query = query.filter(Log.namespace == filter_settings["namespace"][0])
701 es_query["query"]["bool"]["filter"]["and"].append(
701 es_query["query"]["bool"]["filter"].append(
702 702 {"term": {"namespace": filter_settings["namespace"][0]}}
703 703 )
704 704 query.delete(synchronize_session=False)
705 705 request.tm.commit()
706 706 Datastores.es.transport.perform_request(
707 707 "DELETE", "/{}/{}/_query".format("rcae_l_*", "log"), body=es_query
708 708 )
@@ -1,533 +1,529 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors
4 4 #
5 5 # Licensed under the Apache License, Version 2.0 (the "License");
6 6 # you may not use this file except in compliance with the License.
7 7 # You may obtain a copy of the License at
8 8 #
9 9 # http://www.apache.org/licenses/LICENSE-2.0
10 10 #
11 11 # Unless required by applicable law or agreed to in writing, software
12 12 # distributed under the License is distributed on an "AS IS" BASIS,
13 13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 14 # See the License for the specific language governing permissions and
15 15 # limitations under the License.
16 16
17 17 from datetime import datetime, timedelta
18 18 import math
19 19 import uuid
20 20 import hashlib
21 21 import copy
22 22 import urllib.parse
23 23 import logging
24 24 import sqlalchemy as sa
25 25
26 26 from appenlight.models import Base, Datastores
27 27 from appenlight.lib.utils.date_utils import convert_date
28 28 from appenlight.lib.utils import convert_es_type
29 29 from appenlight.models.slow_call import SlowCall
30 30 from appenlight.lib.utils import channelstream_request
31 31 from appenlight.lib.enums import ReportType, Language
32 32 from pyramid.threadlocal import get_current_registry, get_current_request
33 33 from sqlalchemy.dialects.postgresql import JSON
34 34 from ziggurat_foundations.models.base import BaseModel
35 35
36 36 log = logging.getLogger(__name__)
37 37
38 38 REPORT_TYPE_MATRIX = {
39 39 "http_status": {"type": "int", "ops": ("eq", "ne", "ge", "le")},
40 40 "group:priority": {"type": "int", "ops": ("eq", "ne", "ge", "le")},
41 41 "duration": {"type": "float", "ops": ("ge", "le")},
42 42 "url_domain": {
43 43 "type": "unicode",
44 44 "ops": ("eq", "ne", "startswith", "endswith", "contains"),
45 45 },
46 46 "url_path": {
47 47 "type": "unicode",
48 48 "ops": ("eq", "ne", "startswith", "endswith", "contains"),
49 49 },
50 50 "error": {
51 51 "type": "unicode",
52 52 "ops": ("eq", "ne", "startswith", "endswith", "contains"),
53 53 },
54 54 "tags:server_name": {
55 55 "type": "unicode",
56 56 "ops": ("eq", "ne", "startswith", "endswith", "contains"),
57 57 },
58 58 "traceback": {"type": "unicode", "ops": ("contains",)},
59 59 "group:occurences": {"type": "int", "ops": ("eq", "ne", "ge", "le")},
60 60 }
61 61
62 62
63 63 class Report(Base, BaseModel):
64 64 __tablename__ = "reports"
65 65 __table_args__ = {"implicit_returning": False}
66 66
67 67 id = sa.Column(sa.Integer, nullable=False, primary_key=True)
68 68 group_id = sa.Column(
69 69 sa.BigInteger,
70 70 sa.ForeignKey("reports_groups.id", ondelete="cascade", onupdate="cascade"),
71 71 )
72 72 resource_id = sa.Column(sa.Integer(), nullable=False, index=True)
73 73 report_type = sa.Column(sa.Integer(), nullable=False, index=True)
74 74 error = sa.Column(sa.UnicodeText(), index=True)
75 75 extra = sa.Column(JSON(), default={})
76 76 request = sa.Column(JSON(), nullable=False, default={})
77 77 ip = sa.Column(sa.String(39), index=True, default="")
78 78 username = sa.Column(sa.Unicode(255), default="")
79 79 user_agent = sa.Column(sa.Unicode(255), default="")
80 80 url = sa.Column(sa.UnicodeText(), index=True)
81 81 request_id = sa.Column(sa.Text())
82 82 request_stats = sa.Column(JSON(), nullable=False, default={})
83 83 traceback = sa.Column(JSON(), nullable=False, default=None)
84 84 traceback_hash = sa.Column(sa.Text())
85 85 start_time = sa.Column(
86 86 sa.DateTime(), default=datetime.utcnow, server_default=sa.func.now()
87 87 )
88 88 end_time = sa.Column(sa.DateTime())
89 89 duration = sa.Column(sa.Float, default=0)
90 90 http_status = sa.Column(sa.Integer, index=True)
91 91 url_domain = sa.Column(sa.Unicode(100), index=True)
92 92 url_path = sa.Column(sa.Unicode(255), index=True)
93 93 tags = sa.Column(JSON(), nullable=False, default={})
94 94 language = sa.Column(sa.Integer(), default=0)
95 95 # this is used to determine partition for the report
96 96 report_group_time = sa.Column(
97 97 sa.DateTime(), default=datetime.utcnow, server_default=sa.func.now()
98 98 )
99 99
100 100 logs = sa.orm.relationship(
101 101 "Log",
102 102 lazy="dynamic",
103 103 passive_deletes=True,
104 104 passive_updates=True,
105 105 primaryjoin="and_(Report.request_id==Log.request_id, "
106 106 "Log.request_id != None, Log.request_id != '')",
107 107 foreign_keys="[Log.request_id]",
108 108 )
109 109
110 110 slow_calls = sa.orm.relationship(
111 111 "SlowCall",
112 112 backref="detail",
113 113 cascade="all, delete-orphan",
114 114 passive_deletes=True,
115 115 passive_updates=True,
116 116 order_by="SlowCall.timestamp",
117 117 )
118 118
119 119 def set_data(self, data, resource, protocol_version=None):
120 120 self.http_status = data["http_status"]
121 121 self.priority = data["priority"]
122 122 self.error = data["error"]
123 123 report_language = data.get("language", "").lower()
124 124 self.language = getattr(Language, report_language, Language.unknown)
125 125 # we need temp holder here to decide later
126 126 # if we want to to commit the tags if report is marked for creation
127 127 self.tags = {"server_name": data["server"], "view_name": data["view_name"]}
128 128 if data.get("tags"):
129 129 for tag_tuple in data["tags"]:
130 130 self.tags[tag_tuple[0]] = tag_tuple[1]
131 131 self.traceback = data["traceback"]
132 132 stripped_traceback = self.stripped_traceback()
133 133 tb_repr = repr(stripped_traceback).encode("utf8")
134 134 self.traceback_hash = hashlib.sha1(tb_repr).hexdigest()
135 135 url_info = urllib.parse.urlsplit(data.get("url", ""), allow_fragments=False)
136 136 self.url_domain = url_info.netloc[:128]
137 137 self.url_path = url_info.path[:2048]
138 138 self.occurences = data["occurences"]
139 139 if self.error:
140 140 self.report_type = ReportType.error
141 141 else:
142 142 self.report_type = ReportType.slow
143 143
144 144 # but if its status 404 its 404 type
145 145 if self.http_status in [404, "404"] or self.error == "404 Not Found":
146 146 self.report_type = ReportType.not_found
147 147 self.error = ""
148 148
149 149 self.generate_grouping_hash(
150 150 data.get("appenlight.group_string", data.get("group_string")),
151 151 resource.default_grouping,
152 152 protocol_version,
153 153 )
154 154
155 155 # details
156 156 if data["http_status"] in [404, "404"]:
157 157 data = {
158 158 "username": data["username"],
159 159 "ip": data["ip"],
160 160 "url": data["url"],
161 161 "user_agent": data["user_agent"],
162 162 }
163 163 if data.get("HTTP_REFERER") or data.get("http_referer"):
164 164 data["HTTP_REFERER"] = data.get("HTTP_REFERER", "") or data.get(
165 165 "http_referer", ""
166 166 )
167 167
168 168 self.resource_id = resource.resource_id
169 169 self.username = data["username"]
170 170 self.user_agent = data["user_agent"]
171 171 self.ip = data["ip"]
172 172 self.extra = {}
173 173 if data.get("extra"):
174 174 for extra_tuple in data["extra"]:
175 175 self.extra[extra_tuple[0]] = extra_tuple[1]
176 176
177 177 self.url = data["url"]
178 178 self.request_id = data.get("request_id", "").replace("-", "") or str(
179 179 uuid.uuid4()
180 180 )
181 181 request_data = data.get("request", {})
182 182
183 183 self.request = request_data
184 184 self.request_stats = data.get("request_stats", {})
185 185 traceback = data.get("traceback")
186 186 if not traceback:
187 187 traceback = data.get("frameinfo")
188 188 self.traceback = traceback
189 189 start_date = convert_date(data.get("start_time"))
190 190 if not self.start_time or self.start_time < start_date:
191 191 self.start_time = start_date
192 192
193 193 self.end_time = convert_date(data.get("end_time"), False)
194 194 self.duration = 0
195 195
196 196 if self.start_time and self.end_time:
197 197 d = self.end_time - self.start_time
198 198 self.duration = d.total_seconds()
199 199
200 200 # update tags with other vars
201 201 if self.username:
202 202 self.tags["user_name"] = self.username
203 203 self.tags["report_language"] = Language.key_from_value(self.language)
204 204
205 205 def add_slow_calls(self, data, report_group):
206 206 slow_calls = []
207 207 for call in data.get("slow_calls", []):
208 208 sc_inst = SlowCall()
209 209 sc_inst.set_data(
210 210 call, resource_id=self.resource_id, report_group=report_group
211 211 )
212 212 slow_calls.append(sc_inst)
213 213 self.slow_calls.extend(slow_calls)
214 214 return slow_calls
215 215
216 216 def get_dict(self, request, details=False, exclude_keys=None, include_keys=None):
217 217 from appenlight.models.services.report_group import ReportGroupService
218 218
219 219 instance_dict = super(Report, self).get_dict()
220 220 instance_dict["req_stats"] = self.req_stats()
221 221 instance_dict["group"] = {}
222 222 instance_dict["group"]["id"] = self.report_group.id
223 223 instance_dict["group"]["total_reports"] = self.report_group.total_reports
224 224 instance_dict["group"]["last_report"] = self.report_group.last_report
225 225 instance_dict["group"]["priority"] = self.report_group.priority
226 226 instance_dict["group"]["occurences"] = self.report_group.occurences
227 227 instance_dict["group"]["last_timestamp"] = self.report_group.last_timestamp
228 228 instance_dict["group"]["first_timestamp"] = self.report_group.first_timestamp
229 229 instance_dict["group"]["public"] = self.report_group.public
230 230 instance_dict["group"]["fixed"] = self.report_group.fixed
231 231 instance_dict["group"]["read"] = self.report_group.read
232 232 instance_dict["group"]["average_duration"] = self.report_group.average_duration
233 233
234 234 instance_dict["resource_name"] = self.report_group.application.resource_name
235 235 instance_dict["report_type"] = self.report_type
236 236
237 237 if instance_dict["http_status"] == 404 and not instance_dict["error"]:
238 238 instance_dict["error"] = "404 Not Found"
239 239
240 240 if details:
241 241 instance_dict[
242 242 "affected_users_count"
243 243 ] = ReportGroupService.affected_users_count(self.report_group)
244 244 instance_dict["top_affected_users"] = [
245 245 {"username": u.username, "count": u.count}
246 246 for u in ReportGroupService.top_affected_users(self.report_group)
247 247 ]
248 248 instance_dict["application"] = {"integrations": []}
249 249 for integration in self.report_group.application.integrations:
250 250 if integration.front_visible:
251 251 instance_dict["application"]["integrations"].append(
252 252 {
253 253 "name": integration.integration_name,
254 254 "action": integration.integration_action,
255 255 }
256 256 )
257 257 instance_dict["comments"] = [
258 258 c.get_dict() for c in self.report_group.comments
259 259 ]
260 260
261 261 instance_dict["group"]["next_report"] = None
262 262 instance_dict["group"]["previous_report"] = None
263 263 next_in_group = self.get_next_in_group(request)
264 264 previous_in_group = self.get_previous_in_group(request)
265 265 if next_in_group:
266 266 instance_dict["group"]["next_report"] = next_in_group
267 267 if previous_in_group:
268 268 instance_dict["group"]["previous_report"] = previous_in_group
269 269
270 270 # slow call ordering
271 271 def find_parent(row, data):
272 272 for r in reversed(data):
273 273 try:
274 274 if (
275 275 row["timestamp"] > r["timestamp"]
276 276 and row["end_time"] < r["end_time"]
277 277 ):
278 278 return r
279 279 except TypeError as e:
280 280 log.warning("reports_view.find_parent: %s" % e)
281 281 return None
282 282
283 283 new_calls = []
284 284 calls = [c.get_dict() for c in self.slow_calls]
285 285 while calls:
286 286 # start from end
287 287 for x in range(len(calls) - 1, -1, -1):
288 288 parent = find_parent(calls[x], calls)
289 289 if parent:
290 290 parent["children"].append(calls[x])
291 291 else:
292 292 # no parent at all? append to new calls anyways
293 293 new_calls.append(calls[x])
294 294 # print 'append', calls[x]
295 295 del calls[x]
296 296 break
297 297 instance_dict["slow_calls"] = new_calls
298 298
299 299 instance_dict["front_url"] = self.get_public_url(request)
300 300
301 301 exclude_keys_list = exclude_keys or []
302 302 include_keys_list = include_keys or []
303 303 for k in list(instance_dict.keys()):
304 304 if k == "group":
305 305 continue
306 306 if k in exclude_keys_list or (k not in include_keys_list and include_keys):
307 307 del instance_dict[k]
308 308 return instance_dict
309 309
310 310 def get_previous_in_group(self, request):
311 311 query = {
312 312 "size": 1,
313 313 "query": {
314 314 "bool": {
315 "filter": {
316 "and": [
317 {"term": {"group_id": self.group_id}},
318 {"range": {"pg_id": {"lt": self.id}}},
319 ]
320 }
315 "filter": [
316 {"term": {"group_id": self.group_id}},
317 {"range": {"pg_id": {"lt": self.id}}},
318 ]
321 319 }
322 320 },
323 321 "sort": [{"_doc": {"order": "desc"}}],
324 322 }
325 323 result = request.es_conn.search(
326 324 body=query, index=self.partition_id, doc_type="report"
327 325 )
328 326 if result["hits"]["total"]:
329 327 return result["hits"]["hits"][0]["_source"]["pg_id"]
330 328
331 329 def get_next_in_group(self, request):
332 330 query = {
333 331 "size": 1,
334 332 "query": {
335 333 "bool": {
336 "filter": {
337 "and": [
338 {"term": {"group_id": self.group_id}},
339 {"range": {"pg_id": {"gt": self.id}}},
340 ]
341 }
334 "filter": [
335 {"term": {"group_id": self.group_id}},
336 {"range": {"pg_id": {"gt": self.id}}},
337 ]
342 338 }
343 339 },
344 340 "sort": [{"_doc": {"order": "asc"}}],
345 341 }
346 342 result = request.es_conn.search(
347 343 body=query, index=self.partition_id, doc_type="report"
348 344 )
349 345 if result["hits"]["total"]:
350 346 return result["hits"]["hits"][0]["_source"]["pg_id"]
351 347
352 348 def get_public_url(self, request=None, report_group=None, _app_url=None):
353 349 """
354 350 Returns url that user can use to visit specific report
355 351 """
356 352 if not request:
357 353 request = get_current_request()
358 354 url = request.route_url("/", _app_url=_app_url)
359 355 if report_group:
360 356 return (url + "ui/report/%s/%s") % (report_group.id, self.id)
361 357 return (url + "ui/report/%s/%s") % (self.group_id, self.id)
362 358
363 359 def req_stats(self):
364 360 stats = self.request_stats.copy()
365 361 stats["percentages"] = {}
366 362 stats["percentages"]["main"] = 100.0
367 363 main = stats.get("main", 0.0)
368 364 if not main:
369 365 return None
370 366 for name, call_time in stats.items():
371 367 if "calls" not in name and "main" not in name and "percentages" not in name:
372 368 stats["main"] -= call_time
373 369 stats["percentages"][name] = math.floor((call_time / main * 100.0))
374 370 stats["percentages"]["main"] -= stats["percentages"][name]
375 371 if stats["percentages"]["main"] < 0.0:
376 372 stats["percentages"]["main"] = 0.0
377 373 stats["main"] = 0.0
378 374 return stats
379 375
380 376 def generate_grouping_hash(
381 377 self, hash_string=None, default_grouping=None, protocol_version=None
382 378 ):
383 379 """
384 380 Generates SHA1 hash that will be used to group reports together
385 381 """
386 382 if not hash_string:
387 383 location = self.tags.get("view_name") or self.url_path
388 384 server_name = self.tags.get("server_name") or ""
389 385 if default_grouping == "url_traceback":
390 386 hash_string = "%s_%s_%s" % (self.traceback_hash, location, self.error)
391 387 if self.language == Language.javascript:
392 388 hash_string = "%s_%s" % (self.traceback_hash, self.error)
393 389
394 390 elif default_grouping == "traceback_server":
395 391 hash_string = "%s_%s" % (self.traceback_hash, server_name)
396 392 if self.language == Language.javascript:
397 393 hash_string = "%s_%s" % (self.traceback_hash, server_name)
398 394 else:
399 395 hash_string = "%s_%s" % (self.error, location)
400 396 month = datetime.utcnow().date().replace(day=1)
401 397 hash_string = "{}_{}".format(month, hash_string)
402 398 binary_string = hash_string.encode("utf8")
403 399 self.grouping_hash = hashlib.sha1(binary_string).hexdigest()
404 400 return self.grouping_hash
405 401
406 402 def stripped_traceback(self):
407 403 """
408 404 Traceback without local vars
409 405 """
410 406 stripped_traceback = copy.deepcopy(self.traceback)
411 407
412 408 if isinstance(stripped_traceback, list):
413 409 for row in stripped_traceback:
414 410 row.pop("vars", None)
415 411 return stripped_traceback
416 412
417 413 def notify_channel(self, report_group):
418 414 """
419 415 Sends notification to websocket channel
420 416 """
421 417 settings = get_current_registry().settings
422 418 log.info("notify channelstream")
423 419 if self.report_type != ReportType.error:
424 420 return
425 421 payload = {
426 422 "type": "message",
427 423 "user": "__system__",
428 424 "channel": "app_%s" % self.resource_id,
429 425 "message": {
430 426 "topic": "front_dashboard.new_topic",
431 427 "report": {
432 428 "group": {
433 429 "priority": report_group.priority,
434 430 "first_timestamp": report_group.first_timestamp,
435 431 "last_timestamp": report_group.last_timestamp,
436 432 "average_duration": report_group.average_duration,
437 433 "occurences": report_group.occurences,
438 434 },
439 435 "report_id": self.id,
440 436 "group_id": self.group_id,
441 437 "resource_id": self.resource_id,
442 438 "http_status": self.http_status,
443 439 "url_domain": self.url_domain,
444 440 "url_path": self.url_path,
445 441 "error": self.error or "",
446 442 "server": self.tags.get("server_name"),
447 443 "view_name": self.tags.get("view_name"),
448 444 "front_url": self.get_public_url(),
449 445 },
450 446 },
451 447 }
452 448 channelstream_request(
453 449 settings["cometd.secret"],
454 450 "/message",
455 451 [payload],
456 452 servers=[settings["cometd_servers"]],
457 453 )
458 454
459 455 def es_doc(self):
460 456 tags = {}
461 457 tag_list = []
462 458 for name, value in self.tags.items():
463 459 name = name.replace(".", "_")
464 460 tag_list.append(name)
465 461 tags[name] = {
466 462 "values": convert_es_type(value),
467 463 "numeric_values": value
468 464 if (isinstance(value, (int, float)) and not isinstance(value, bool))
469 465 else None,
470 466 }
471 467
472 468 if "user_name" not in self.tags and self.username:
473 469 tags["user_name"] = {"value": [self.username], "numeric_value": None}
474 470 return {
475 471 "_id": str(self.id),
476 472 "pg_id": str(self.id),
477 473 "resource_id": self.resource_id,
478 474 "http_status": self.http_status or "",
479 475 "start_time": self.start_time,
480 476 "end_time": self.end_time,
481 477 "url_domain": self.url_domain if self.url_domain else "",
482 478 "url_path": self.url_path if self.url_path else "",
483 479 "duration": self.duration,
484 480 "error": self.error if self.error else "",
485 481 "report_type": self.report_type,
486 482 "request_id": self.request_id,
487 483 "ip": self.ip,
488 484 "group_id": str(self.group_id),
489 485 "_parent": str(self.group_id),
490 486 "tags": tags,
491 487 "tag_list": tag_list,
492 488 }
493 489
494 490 @property
495 491 def partition_id(self):
496 492 return "rcae_r_%s" % self.report_group_time.strftime("%Y_%m")
497 493
498 494 def partition_range(self):
499 495 start_date = self.report_group_time.date().replace(day=1)
500 496 end_date = start_date + timedelta(days=40)
501 497 end_date = end_date.replace(day=1)
502 498 return start_date, end_date
503 499
504 500
505 501 def after_insert(mapper, connection, target):
506 502 if not hasattr(target, "_skip_ft_index"):
507 503 data = target.es_doc()
508 504 data.pop("_id", None)
509 505 Datastores.es.index(
510 506 target.partition_id, "report", data, parent=target.group_id, id=target.id
511 507 )
512 508
513 509
514 510 def after_update(mapper, connection, target):
515 511 if not hasattr(target, "_skip_ft_index"):
516 512 data = target.es_doc()
517 513 data.pop("_id", None)
518 514 Datastores.es.index(
519 515 target.partition_id, "report", data, parent=target.group_id, id=target.id
520 516 )
521 517
522 518
523 519 def after_delete(mapper, connection, target):
524 520 if not hasattr(target, "_skip_ft_index"):
525 521 query = {"query": {"term": {"pg_id": target.id}}}
526 522 Datastores.es.transport.perform_request(
527 523 "DELETE", "/{}/{}/_query".format(target.partition_id, "report"), body=query
528 524 )
529 525
530 526
531 527 sa.event.listen(Report, "after_insert", after_insert)
532 528 sa.event.listen(Report, "after_update", after_update)
533 529 sa.event.listen(Report, "after_delete", after_delete)
@@ -1,222 +1,222 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors
4 4 #
5 5 # Licensed under the Apache License, Version 2.0 (the "License");
6 6 # you may not use this file except in compliance with the License.
7 7 # You may obtain a copy of the License at
8 8 #
9 9 # http://www.apache.org/licenses/LICENSE-2.0
10 10 #
11 11 # Unless required by applicable law or agreed to in writing, software
12 12 # distributed under the License is distributed on an "AS IS" BASIS,
13 13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 14 # See the License for the specific language governing permissions and
15 15 # limitations under the License.
16 16
17 17 import paginate
18 18 import logging
19 19 import sqlalchemy as sa
20 20
21 21 from appenlight.models.log import Log
22 22 from appenlight.models import get_db_session, Datastores
23 23 from appenlight.models.services.base import BaseService
24 24 from appenlight.lib.utils import es_index_name_limiter
25 25
26 26 log = logging.getLogger(__name__)
27 27
28 28
29 29 class LogService(BaseService):
30 30 @classmethod
31 31 def get_logs(cls, resource_ids=None, filter_settings=None, db_session=None):
32 32 # ensure we always have id's passed
33 33 if not resource_ids:
34 34 # raise Exception('No App ID passed')
35 35 return []
36 36 db_session = get_db_session(db_session)
37 37 q = db_session.query(Log)
38 38 q = q.filter(Log.resource_id.in_(resource_ids))
39 39 if filter_settings.get("start_date"):
40 40 q = q.filter(Log.timestamp >= filter_settings.get("start_date"))
41 41 if filter_settings.get("end_date"):
42 42 q = q.filter(Log.timestamp <= filter_settings.get("end_date"))
43 43 if filter_settings.get("log_level"):
44 44 q = q.filter(Log.log_level == filter_settings.get("log_level").upper())
45 45 if filter_settings.get("request_id"):
46 46 request_id = filter_settings.get("request_id", "")
47 47 q = q.filter(Log.request_id == request_id.replace("-", ""))
48 48 if filter_settings.get("namespace"):
49 49 q = q.filter(Log.namespace == filter_settings.get("namespace"))
50 50 q = q.order_by(sa.desc(Log.timestamp))
51 51 return q
52 52
53 53 @classmethod
54 54 def es_query_builder(cls, app_ids, filter_settings):
55 55 if not filter_settings:
56 56 filter_settings = {}
57 57
58 58 query = {
59 59 "query": {
60 60 "bool": {
61 "filter": {"and": [{"terms": {"resource_id": list(app_ids)}}]}
61 "filter": [{"terms": {"resource_id": list(app_ids)}}]
62 62 }
63 63 }
64 64 }
65 65
66 66 start_date = filter_settings.get("start_date")
67 67 end_date = filter_settings.get("end_date")
68 filter_part = query["query"]["bool"]["filter"]["and"]
68 filter_part = query["query"]["bool"]["filter"]
69 69
70 70 for tag in filter_settings.get("tags", []):
71 71 tag_values = [v.lower() for v in tag["value"]]
72 72 key = "tags.%s.values" % tag["name"].replace(".", "_")
73 73 filter_part.append({"terms": {key: tag_values}})
74 74
75 75 date_range = {"range": {"timestamp": {}}}
76 76 if start_date:
77 77 date_range["range"]["timestamp"]["gte"] = start_date
78 78 if end_date:
79 79 date_range["range"]["timestamp"]["lte"] = end_date
80 80 if start_date or end_date:
81 81 filter_part.append(date_range)
82 82
83 83 levels = filter_settings.get("level")
84 84 if levels:
85 85 filter_part.append({"terms": {"log_level": levels}})
86 86 namespaces = filter_settings.get("namespace")
87 87 if namespaces:
88 88 filter_part.append({"terms": {"namespace": namespaces}})
89 89
90 90 request_ids = filter_settings.get("request_id")
91 91 if request_ids:
92 92 filter_part.append({"terms": {"request_id": request_ids}})
93 93
94 94 messages = filter_settings.get("message")
95 95 if messages:
96 96 query["query"]["bool"]["must"] = {
97 97 "match": {"message": {"query": " ".join(messages), "operator": "and"}}
98 98 }
99 99 return query
100 100
101 101 @classmethod
102 102 def get_time_series_aggregate(cls, app_ids=None, filter_settings=None):
103 103 if not app_ids:
104 104 return {}
105 105 es_query = cls.es_query_builder(app_ids, filter_settings)
106 106 es_query["aggs"] = {
107 107 "events_over_time": {
108 108 "date_histogram": {
109 109 "field": "timestamp",
110 110 "interval": "1h",
111 111 "min_doc_count": 0,
112 112 "extended_bounds": {
113 113 "max": filter_settings.get("end_date"),
114 114 "min": filter_settings.get("start_date"),
115 115 },
116 116 }
117 117 }
118 118 }
119 119 log.debug(es_query)
120 120 index_names = es_index_name_limiter(
121 121 filter_settings.get("start_date"),
122 122 filter_settings.get("end_date"),
123 123 ixtypes=["logs"],
124 124 )
125 125 if index_names:
126 126 results = Datastores.es.search(
127 127 body=es_query, index=index_names, doc_type="log", size=0
128 128 )
129 129 else:
130 130 results = []
131 131 return results
132 132
133 133 @classmethod
134 134 def get_search_iterator(
135 135 cls,
136 136 app_ids=None,
137 137 page=1,
138 138 items_per_page=50,
139 139 order_by=None,
140 140 filter_settings=None,
141 141 limit=None,
142 142 ):
143 143 if not app_ids:
144 144 return {}, 0
145 145
146 146 es_query = cls.es_query_builder(app_ids, filter_settings)
147 147 sort_query = {"sort": [{"timestamp": {"order": "desc"}}]}
148 148 es_query.update(sort_query)
149 149 log.debug(es_query)
150 150 es_from = (page - 1) * items_per_page
151 151 index_names = es_index_name_limiter(
152 152 filter_settings.get("start_date"),
153 153 filter_settings.get("end_date"),
154 154 ixtypes=["logs"],
155 155 )
156 156 if not index_names:
157 157 return {}, 0
158 158
159 159 results = Datastores.es.search(
160 160 body=es_query,
161 161 index=index_names,
162 162 doc_type="log",
163 163 size=items_per_page,
164 164 from_=es_from,
165 165 )
166 166 if results["hits"]["total"] > 5000:
167 167 count = 5000
168 168 else:
169 169 count = results["hits"]["total"]
170 170 return results["hits"], count
171 171
172 172 @classmethod
173 173 def get_paginator_by_app_ids(
174 174 cls,
175 175 app_ids=None,
176 176 page=1,
177 177 item_count=None,
178 178 items_per_page=50,
179 179 order_by=None,
180 180 filter_settings=None,
181 181 exclude_columns=None,
182 182 db_session=None,
183 183 ):
184 184 if not filter_settings:
185 185 filter_settings = {}
186 186 results, item_count = cls.get_search_iterator(
187 187 app_ids, page, items_per_page, order_by, filter_settings
188 188 )
189 189 paginator = paginate.Page(
190 190 [], item_count=item_count, items_per_page=items_per_page, **filter_settings
191 191 )
192 192 ordered_ids = tuple(
193 193 item["_source"]["pg_id"] for item in results.get("hits", [])
194 194 )
195 195
196 196 sorted_instance_list = []
197 197 if ordered_ids:
198 198 db_session = get_db_session(db_session)
199 199 query = db_session.query(Log)
200 200 query = query.filter(Log.log_id.in_(ordered_ids))
201 201 query = query.order_by(sa.desc("timestamp"))
202 202 sa_items = query.all()
203 203 # resort by score
204 204 for i_id in ordered_ids:
205 205 for item in sa_items:
206 206 if str(item.log_id) == str(i_id):
207 207 sorted_instance_list.append(item)
208 208 paginator.sa_items = sorted_instance_list
209 209 return paginator
210 210
211 211 @classmethod
212 212 def query_by_primary_key_and_namespace(cls, list_of_pairs, db_session=None):
213 213 db_session = get_db_session(db_session)
214 214 list_of_conditions = []
215 215 query = db_session.query(Log)
216 216 for pair in list_of_pairs:
217 217 list_of_conditions.append(
218 218 sa.and_(Log.primary_key == pair["pk"], Log.namespace == pair["ns"])
219 219 )
220 220 query = query.filter(sa.or_(*list_of_conditions))
221 221 query = query.order_by(sa.asc(Log.timestamp), sa.asc(Log.log_id))
222 222 return query
@@ -1,523 +1,519 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors
4 4 #
5 5 # Licensed under the Apache License, Version 2.0 (the "License");
6 6 # you may not use this file except in compliance with the License.
7 7 # You may obtain a copy of the License at
8 8 #
9 9 # http://www.apache.org/licenses/LICENSE-2.0
10 10 #
11 11 # Unless required by applicable law or agreed to in writing, software
12 12 # distributed under the License is distributed on an "AS IS" BASIS,
13 13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 14 # See the License for the specific language governing permissions and
15 15 # limitations under the License.
16 16
17 17 import logging
18 18 import paginate
19 19 import sqlalchemy as sa
20 20 import appenlight.lib.helpers as h
21 21
22 22 from datetime import datetime
23 23
24 24 from appenlight.models import get_db_session, Datastores
25 25 from appenlight.models.report import Report
26 26 from appenlight.models.report_group import ReportGroup
27 27 from appenlight.models.report_comment import ReportComment
28 28 from appenlight.models.user import User
29 29 from appenlight.models.services.base import BaseService
30 30 from appenlight.lib.enums import ReportType
31 31 from appenlight.lib.utils import es_index_name_limiter
32 32
33 33 log = logging.getLogger(__name__)
34 34
35 35
36 36 class ReportGroupService(BaseService):
37 37 @classmethod
38 38 def get_trending(cls, request, filter_settings, limit=15, db_session=None):
39 39 """
40 40 Returns report groups trending for specific time interval
41 41 """
42 42 db_session = get_db_session(db_session)
43 43
44 44 tags = []
45 45 if filter_settings.get("tags"):
46 46 for tag in filter_settings["tags"]:
47 47 tags.append(
48 48 {"terms": {"tags.{}.values".format(tag["name"]): tag["value"]}}
49 49 )
50 50
51 51 index_names = es_index_name_limiter(
52 52 start_date=filter_settings["start_date"],
53 53 end_date=filter_settings["end_date"],
54 54 ixtypes=["reports"],
55 55 )
56 56
57 57 if not index_names or not filter_settings["resource"]:
58 58 return []
59 59
60 60 es_query = {
61 61 "aggs": {
62 62 "parent_agg": {
63 63 "aggs": {
64 64 "groups": {
65 65 "aggs": {
66 66 "sub_agg": {
67 67 "value_count": {"field": "tags.group_id.values"}
68 68 }
69 69 },
70 70 "filter": {"exists": {"field": "tags.group_id.values"}},
71 71 }
72 72 },
73 73 "terms": {"field": "tags.group_id.values", "size": limit},
74 74 }
75 75 },
76 76 "query": {
77 77 "bool": {
78 "filter": {
79 "and": [
80 {
81 "terms": {
82 "resource_id": [filter_settings["resource"][0]]
83 }
84 },
85 {
86 "range": {
87 "timestamp": {
88 "gte": filter_settings["start_date"],
89 "lte": filter_settings["end_date"],
90 }
78 "filter": [
79 {
80 "terms": {
81 "resource_id": [filter_settings["resource"][0]]
82 }
83 },
84 {
85 "range": {
86 "timestamp": {
87 "gte": filter_settings["start_date"],
88 "lte": filter_settings["end_date"],
91 89 }
92 },
93 ]
94 }
90 }
91 },
92 ]
95 93 }
96 94 },
97 95 }
98 96 if tags:
99 es_query["query"]["bool"]["filter"]["and"].extend(tags)
97 es_query["query"]["bool"]["filter"].extend(tags)
100 98
101 99 result = Datastores.es.search(
102 100 body=es_query, index=index_names, doc_type="log", size=0
103 101 )
104 102 series = []
105 103 for bucket in result["aggregations"]["parent_agg"]["buckets"]:
106 104 series.append(
107 105 {"key": bucket["key"], "groups": bucket["groups"]["sub_agg"]["value"]}
108 106 )
109 107
110 108 report_groups_d = {}
111 109 for g in series:
112 110 report_groups_d[int(g["key"])] = g["groups"] or 0
113 111
114 112 query = db_session.query(ReportGroup)
115 113 query = query.filter(ReportGroup.id.in_(list(report_groups_d.keys())))
116 114 query = query.options(sa.orm.joinedload(ReportGroup.last_report_ref))
117 115 results = [(report_groups_d[group.id], group) for group in query]
118 116 return sorted(results, reverse=True, key=lambda x: x[0])
119 117
120 118 @classmethod
121 119 def get_search_iterator(
122 120 cls,
123 121 app_ids=None,
124 122 page=1,
125 123 items_per_page=50,
126 124 order_by=None,
127 125 filter_settings=None,
128 126 limit=None,
129 127 ):
130 128 if not app_ids:
131 129 return {}
132 130 if not filter_settings:
133 131 filter_settings = {}
134 132
135 133 query = {
136 134 "size": 0,
137 135 "query": {
138 136 "bool": {
139 137 "must": [],
140 138 "should": [],
141 "filter": {"and": [{"terms": {"resource_id": list(app_ids)}}]}
139 "filter": [{"terms": {"resource_id": list(app_ids)}}]
142 140 }
143 141 },
144 142 "aggs": {
145 143 "top_groups": {
146 144 "terms": {
147 145 "size": 5000,
148 146 "field": "_parent",
149 147 "order": {"newest": "desc"},
150 148 },
151 149 "aggs": {
152 150 "top_reports_hits": {
153 151 "top_hits": {"size": 1, "sort": {"start_time": "desc"}}
154 152 },
155 153 "newest": {"max": {"field": "start_time"}},
156 154 },
157 155 }
158 156 },
159 157 }
160 158
161 159 start_date = filter_settings.get("start_date")
162 160 end_date = filter_settings.get("end_date")
163 filter_part = query["query"]["bool"]["filter"]["and"]
161 filter_part = query["query"]["bool"]["filter"]
164 162 date_range = {"range": {"start_time": {}}}
165 163 if start_date:
166 164 date_range["range"]["start_time"]["gte"] = start_date
167 165 if end_date:
168 166 date_range["range"]["start_time"]["lte"] = end_date
169 167 if start_date or end_date:
170 168 filter_part.append(date_range)
171 169
172 170 priorities = filter_settings.get("priority")
173 171
174 172 for tag in filter_settings.get("tags", []):
175 173 tag_values = [v.lower() for v in tag["value"]]
176 174 key = "tags.%s.values" % tag["name"].replace(".", "_")
177 175 filter_part.append({"terms": {key: tag_values}})
178 176
179 177 if priorities:
180 178 filter_part.append(
181 179 {
182 180 "has_parent": {
183 181 "parent_type": "report_group",
184 182 "query": {"terms": {"priority": priorities}},
185 183 }
186 184 }
187 185 )
188 186
189 187 min_occurences = filter_settings.get("min_occurences")
190 188 if min_occurences:
191 189 filter_part.append(
192 190 {
193 191 "has_parent": {
194 192 "parent_type": "report_group",
195 193 "query": {"range": {"occurences": {"gte": min_occurences[0]}}},
196 194 }
197 195 }
198 196 )
199 197
200 198 min_duration = filter_settings.get("min_duration")
201 199 max_duration = filter_settings.get("max_duration")
202 200
203 201 request_ids = filter_settings.get("request_id")
204 202 if request_ids:
205 203 filter_part.append({"terms": {"request_id": request_ids}})
206 204
207 205 duration_range = {"range": {"average_duration": {}}}
208 206 if min_duration:
209 207 duration_range["range"]["average_duration"]["gte"] = min_duration[0]
210 208 if max_duration:
211 209 duration_range["range"]["average_duration"]["lte"] = max_duration[0]
212 210 if min_duration or max_duration:
213 211 filter_part.append(
214 212 {"has_parent": {"parent_type": "report_group", "query": duration_range}}
215 213 )
216 214
217 215 http_status = filter_settings.get("http_status")
218 216 report_type = filter_settings.get("report_type", [ReportType.error])
219 217 # set error report type if http status is not found
220 218 # and we are dealing with slow reports
221 219 if not http_status or ReportType.slow in report_type:
222 220 filter_part.append({"terms": {"report_type": report_type}})
223 221 if http_status:
224 222 filter_part.append({"terms": {"http_status": http_status}})
225 223
226 224 messages = filter_settings.get("message")
227 225 if messages:
228 226 condition = {"match": {"message": " ".join(messages)}}
229 227 query["query"]["bool"]["must"].append(condition)
230 228 errors = filter_settings.get("error")
231 229 if errors:
232 230 condition = {"match": {"error": " ".join(errors)}}
233 231 query["query"]["bool"]["must"].append(condition)
234 232 url_domains = filter_settings.get("url_domain")
235 233 if url_domains:
236 234 condition = {"terms": {"url_domain": url_domains}}
237 235 query["query"]["bool"]["must"].append(condition)
238 236 url_paths = filter_settings.get("url_path")
239 237 if url_paths:
240 238 condition = {"terms": {"url_path": url_paths}}
241 239 query["query"]["bool"]["must"].append(condition)
242 240
243 241 if filter_settings.get("report_status"):
244 242 for status in filter_settings.get("report_status"):
245 243 if status == "never_reviewed":
246 244 filter_part.append(
247 245 {
248 246 "has_parent": {
249 247 "parent_type": "report_group",
250 248 "query": {"term": {"read": False}},
251 249 }
252 250 }
253 251 )
254 252 elif status == "reviewed":
255 253 filter_part.append(
256 254 {
257 255 "has_parent": {
258 256 "parent_type": "report_group",
259 257 "query": {"term": {"read": True}},
260 258 }
261 259 }
262 260 )
263 261 elif status == "public":
264 262 filter_part.append(
265 263 {
266 264 "has_parent": {
267 265 "parent_type": "report_group",
268 266 "query": {"term": {"public": True}},
269 267 }
270 268 }
271 269 )
272 270 elif status == "fixed":
273 271 filter_part.append(
274 272 {
275 273 "has_parent": {
276 274 "parent_type": "report_group",
277 275 "query": {"term": {"fixed": True}},
278 276 }
279 277 }
280 278 )
281 279
282 280 # logging.getLogger('pyelasticsearch').setLevel(logging.DEBUG)
283 281 index_names = es_index_name_limiter(
284 282 filter_settings.get("start_date"),
285 283 filter_settings.get("end_date"),
286 284 ixtypes=["reports"],
287 285 )
288 286 if index_names:
289 287 results = Datastores.es.search(
290 288 body=query,
291 289 index=index_names,
292 290 doc_type=["report", "report_group"],
293 291 size=0,
294 292 )
295 293 else:
296 294 return []
297 295 return results["aggregations"]
298 296
299 297 @classmethod
300 298 def get_paginator_by_app_ids(
301 299 cls,
302 300 app_ids=None,
303 301 page=1,
304 302 item_count=None,
305 303 items_per_page=50,
306 304 order_by=None,
307 305 filter_settings=None,
308 306 exclude_columns=None,
309 307 db_session=None,
310 308 ):
311 309 if not filter_settings:
312 310 filter_settings = {}
313 311 results = cls.get_search_iterator(
314 312 app_ids, page, items_per_page, order_by, filter_settings
315 313 )
316 314
317 315 ordered_ids = []
318 316 if results:
319 317 for item in results["top_groups"]["buckets"]:
320 318 pg_id = item["top_reports_hits"]["hits"]["hits"][0]["_source"]["pg_id"]
321 319 ordered_ids.append(pg_id)
322 320 log.info(filter_settings)
323 321 paginator = paginate.Page(
324 322 ordered_ids, items_per_page=items_per_page, **filter_settings
325 323 )
326 324 sa_items = ()
327 325 if paginator.items:
328 326 db_session = get_db_session(db_session)
329 327 # latest report detail
330 328 query = db_session.query(Report)
331 329 query = query.options(sa.orm.joinedload(Report.report_group))
332 330 query = query.filter(Report.id.in_(paginator.items))
333 331 if filter_settings.get("order_col"):
334 332 order_col = filter_settings.get("order_col")
335 333 if filter_settings.get("order_dir") == "dsc":
336 334 sort_on = "desc"
337 335 else:
338 336 sort_on = "asc"
339 337 if order_col == "when":
340 338 order_col = "last_timestamp"
341 339 query = query.order_by(
342 340 getattr(sa, sort_on)(getattr(ReportGroup, order_col))
343 341 )
344 342 sa_items = query.all()
345 343 sorted_instance_list = []
346 344 for i_id in ordered_ids:
347 345 for report in sa_items:
348 346 if str(report.id) == i_id and report not in sorted_instance_list:
349 347 sorted_instance_list.append(report)
350 348 paginator.sa_items = sorted_instance_list
351 349 return paginator
352 350
353 351 @classmethod
354 352 def by_app_ids(cls, app_ids=None, order_by=True, db_session=None):
355 353 db_session = get_db_session(db_session)
356 354 q = db_session.query(ReportGroup)
357 355 if app_ids:
358 356 q = q.filter(ReportGroup.resource_id.in_(app_ids))
359 357 if order_by:
360 358 q = q.order_by(sa.desc(ReportGroup.id))
361 359 return q
362 360
363 361 @classmethod
364 362 def by_id(cls, group_id, app_ids=None, db_session=None):
365 363 db_session = get_db_session(db_session)
366 364 q = db_session.query(ReportGroup).filter(ReportGroup.id == int(group_id))
367 365 if app_ids:
368 366 q = q.filter(ReportGroup.resource_id.in_(app_ids))
369 367 return q.first()
370 368
371 369 @classmethod
372 370 def by_ids(cls, group_ids=None, db_session=None):
373 371 db_session = get_db_session(db_session)
374 372 query = db_session.query(ReportGroup)
375 373 query = query.filter(ReportGroup.id.in_(group_ids))
376 374 return query
377 375
378 376 @classmethod
379 377 def by_hash_and_resource(
380 378 cls, resource_id, grouping_hash, since_when=None, db_session=None
381 379 ):
382 380 db_session = get_db_session(db_session)
383 381 q = db_session.query(ReportGroup)
384 382 q = q.filter(ReportGroup.resource_id == resource_id)
385 383 q = q.filter(ReportGroup.grouping_hash == grouping_hash)
386 384 q = q.filter(ReportGroup.fixed == False)
387 385 if since_when:
388 386 q = q.filter(ReportGroup.first_timestamp >= since_when)
389 387 return q.first()
390 388
391 389 @classmethod
392 390 def users_commenting(cls, report_group, exclude_user_id=None, db_session=None):
393 391 db_session = get_db_session(None, report_group)
394 392 query = db_session.query(User).distinct()
395 393 query = query.filter(User.id == ReportComment.owner_id)
396 394 query = query.filter(ReportComment.group_id == report_group.id)
397 395 if exclude_user_id:
398 396 query = query.filter(ReportComment.owner_id != exclude_user_id)
399 397 return query
400 398
401 399 @classmethod
402 400 def affected_users_count(cls, report_group, db_session=None):
403 401 db_session = get_db_session(db_session)
404 402 query = db_session.query(sa.func.count(Report.username))
405 403 query = query.filter(Report.group_id == report_group.id)
406 404 query = query.filter(Report.username != "")
407 405 query = query.filter(Report.username != None)
408 406 query = query.group_by(Report.username)
409 407 return query.count()
410 408
411 409 @classmethod
412 410 def top_affected_users(cls, report_group, db_session=None):
413 411 db_session = get_db_session(db_session)
414 412 count_label = sa.func.count(Report.username).label("count")
415 413 query = db_session.query(Report.username, count_label)
416 414 query = query.filter(Report.group_id == report_group.id)
417 415 query = query.filter(Report.username != None)
418 416 query = query.filter(Report.username != "")
419 417 query = query.group_by(Report.username)
420 418 query = query.order_by(sa.desc(count_label))
421 419 query = query.limit(50)
422 420 return query
423 421
424 422 @classmethod
425 423 def get_report_stats(cls, request, filter_settings):
426 424 """
427 425 Gets report dashboard graphs
428 426 Returns information for BAR charts with occurences/interval information
429 427 detailed means version that returns time intervals - non detailed
430 428 returns total sum
431 429 """
432 430 delta = filter_settings["end_date"] - filter_settings["start_date"]
433 431 if delta < h.time_deltas.get("12h")["delta"]:
434 432 interval = "1m"
435 433 elif delta <= h.time_deltas.get("3d")["delta"]:
436 434 interval = "5m"
437 435 elif delta >= h.time_deltas.get("2w")["delta"]:
438 436 interval = "24h"
439 437 else:
440 438 interval = "1h"
441 439
442 440 group_id = filter_settings.get("group_id")
443 441
444 442 es_query = {
445 443 "aggs": {
446 444 "parent_agg": {
447 445 "aggs": {
448 446 "types": {
449 447 "aggs": {
450 448 "sub_agg": {"terms": {"field": "tags.type.values"}}
451 449 },
452 450 "filter": {
453 451 "and": [{"exists": {"field": "tags.type.values"}}]
454 452 },
455 453 }
456 454 },
457 455 "date_histogram": {
458 456 "extended_bounds": {
459 457 "max": filter_settings["end_date"],
460 458 "min": filter_settings["start_date"],
461 459 },
462 460 "field": "timestamp",
463 461 "interval": interval,
464 462 "min_doc_count": 0,
465 463 },
466 464 }
467 465 },
468 466 "query": {
469 467 "bool": {
470 "filter": {
471 "and": [
472 {
473 "terms": {
474 "resource_id": [filter_settings["resource"][0]]
475 }
476 },
477 {
478 "range": {
479 "timestamp": {
480 "gte": filter_settings["start_date"],
481 "lte": filter_settings["end_date"],
482 }
468 "filter": [
469 {
470 "terms": {
471 "resource_id": [filter_settings["resource"][0]]
472 }
473 },
474 {
475 "range": {
476 "timestamp": {
477 "gte": filter_settings["start_date"],
478 "lte": filter_settings["end_date"],
483 479 }
484 },
485 ]
486 }
480 }
481 },
482 ]
487 483 }
488 484 },
489 485 }
490 486 if group_id:
491 487 parent_agg = es_query["aggs"]["parent_agg"]
492 488 filters = parent_agg["aggs"]["types"]["filter"]["and"]
493 489 filters.append({"terms": {"tags.group_id.values": [group_id]}})
494 490
495 491 index_names = es_index_name_limiter(
496 492 start_date=filter_settings["start_date"],
497 493 end_date=filter_settings["end_date"],
498 494 ixtypes=["reports"],
499 495 )
500 496
501 497 if not index_names:
502 498 return []
503 499
504 500 result = Datastores.es.search(
505 501 body=es_query, index=index_names, doc_type="log", size=0
506 502 )
507 503 series = []
508 504 for bucket in result["aggregations"]["parent_agg"]["buckets"]:
509 505 point = {
510 506 "x": datetime.utcfromtimestamp(int(bucket["key"]) / 1000),
511 507 "report": 0,
512 508 "not_found": 0,
513 509 "slow_report": 0,
514 510 }
515 511 for subbucket in bucket["types"]["sub_agg"]["buckets"]:
516 512 if subbucket["key"] == "slow":
517 513 point["slow_report"] = subbucket["doc_count"]
518 514 elif subbucket["key"] == "error":
519 515 point["report"] = subbucket["doc_count"]
520 516 elif subbucket["key"] == "not_found":
521 517 point["not_found"] = subbucket["doc_count"]
522 518 series.append(point)
523 519 return series
@@ -1,63 +1,59 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors
4 4 #
5 5 # Licensed under the Apache License, Version 2.0 (the "License");
6 6 # you may not use this file except in compliance with the License.
7 7 # You may obtain a copy of the License at
8 8 #
9 9 # http://www.apache.org/licenses/LICENSE-2.0
10 10 #
11 11 # Unless required by applicable law or agreed to in writing, software
12 12 # distributed under the License is distributed on an "AS IS" BASIS,
13 13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 14 # See the License for the specific language governing permissions and
15 15 # limitations under the License.
16 16
17 17 from appenlight.models import Datastores
18 18 from appenlight.models.services.base import BaseService
19 19 from appenlight.lib.enums import ReportType
20 20 from appenlight.lib.utils import es_index_name_limiter
21 21
22 22
23 23 class ReportStatService(BaseService):
24 24 @classmethod
25 25 def count_by_type(cls, report_type, resource_id, since_when):
26 26 report_type = ReportType.key_from_value(report_type)
27 27
28 28 index_names = es_index_name_limiter(start_date=since_when, ixtypes=["reports"])
29 29
30 30 es_query = {
31 31 "aggs": {
32 32 "reports": {
33 33 "aggs": {
34 34 "sub_agg": {"value_count": {"field": "tags.group_id.values"}}
35 35 },
36 "filter": {
37 "and": [
38 {"terms": {"resource_id": [resource_id]}},
39 {"exists": {"field": "tags.group_id.values"}},
40 ]
41 },
36 "filter": [
37 {"terms": {"resource_id": [resource_id]}},
38 {"exists": {"field": "tags.group_id.values"}},
39 ],
42 40 }
43 41 },
44 42 "query": {
45 43 "bool": {
46 "filter": {
47 "and": [
48 {"terms": {"resource_id": [resource_id]}},
49 {"terms": {"tags.type.values": [report_type]}},
50 {"range": {"timestamp": {"gte": since_when}}},
51 ]
52 }
44 "filter": [
45 {"terms": {"resource_id": [resource_id]}},
46 {"terms": {"tags.type.values": [report_type]}},
47 {"range": {"timestamp": {"gte": since_when}}},
48 ]
53 49 }
54 50 },
55 51 }
56 52
57 53 if index_names:
58 54 result = Datastores.es.search(
59 55 body=es_query, index=index_names, doc_type="log", size=0
60 56 )
61 57 return result["aggregations"]["reports"]["sub_agg"]["value"]
62 58 else:
63 59 return 0
@@ -1,617 +1,607 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors
4 4 #
5 5 # Licensed under the Apache License, Version 2.0 (the "License");
6 6 # you may not use this file except in compliance with the License.
7 7 # You may obtain a copy of the License at
8 8 #
9 9 # http://www.apache.org/licenses/LICENSE-2.0
10 10 #
11 11 # Unless required by applicable law or agreed to in writing, software
12 12 # distributed under the License is distributed on an "AS IS" BASIS,
13 13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 14 # See the License for the specific language governing permissions and
15 15 # limitations under the License.
16 16
17 17 from datetime import datetime
18 18
19 19 import appenlight.lib.helpers as h
20 20 from appenlight.models import get_db_session, Datastores
21 21 from appenlight.models.services.base import BaseService
22 22 from appenlight.lib.enums import ReportType
23 23 from appenlight.lib.utils import es_index_name_limiter
24 24
25 25 try:
26 26 from ae_uptime_ce.models.services.uptime_metric import UptimeMetricService
27 27 except ImportError:
28 28 UptimeMetricService = None
29 29
30 30
31 31 def check_key(key, stats, uptime, total_seconds):
32 32 if key not in stats:
33 33 stats[key] = {
34 34 "name": key,
35 35 "requests": 0,
36 36 "errors": 0,
37 37 "tolerated_requests": 0,
38 38 "frustrating_requests": 0,
39 39 "satisfying_requests": 0,
40 40 "total_minutes": total_seconds / 60.0,
41 41 "uptime": uptime,
42 42 "apdex": 0,
43 43 "rpm": 0,
44 44 "response_time": 0,
45 45 "avg_response_time": 0,
46 46 }
47 47
48 48
49 49 class RequestMetricService(BaseService):
50 50 @classmethod
51 51 def get_metrics_stats(cls, request, filter_settings, db_session=None):
52 52 delta = filter_settings["end_date"] - filter_settings["start_date"]
53 53 if delta < h.time_deltas.get("12h")["delta"]:
54 54 interval = "1m"
55 55 elif delta <= h.time_deltas.get("3d")["delta"]:
56 56 interval = "5m"
57 57 elif delta >= h.time_deltas.get("2w")["delta"]:
58 58 interval = "24h"
59 59 else:
60 60 interval = "1h"
61 61
62 62 filter_settings["namespace"] = ["appenlight.request_metric"]
63 63
64 64 es_query = {
65 65 "aggs": {
66 66 "parent_agg": {
67 67 "aggs": {
68 68 "custom": {
69 69 "aggs": {
70 70 "sub_agg": {
71 71 "sum": {"field": "tags.custom.numeric_values"}
72 72 }
73 73 },
74 74 "filter": {
75 75 "exists": {"field": "tags.custom.numeric_values"}
76 76 },
77 77 },
78 78 "main": {
79 79 "aggs": {
80 80 "sub_agg": {
81 81 "sum": {"field": "tags.main.numeric_values"}
82 82 }
83 83 },
84 84 "filter": {"exists": {"field": "tags.main.numeric_values"}},
85 85 },
86 86 "nosql": {
87 87 "aggs": {
88 88 "sub_agg": {
89 89 "sum": {"field": "tags.nosql.numeric_values"}
90 90 }
91 91 },
92 92 "filter": {
93 93 "exists": {"field": "tags.nosql.numeric_values"}
94 94 },
95 95 },
96 96 "remote": {
97 97 "aggs": {
98 98 "sub_agg": {
99 99 "sum": {"field": "tags.remote.numeric_values"}
100 100 }
101 101 },
102 102 "filter": {
103 103 "exists": {"field": "tags.remote.numeric_values"}
104 104 },
105 105 },
106 106 "requests": {
107 107 "aggs": {
108 108 "sub_agg": {
109 109 "sum": {"field": "tags.requests.numeric_values"}
110 110 }
111 111 },
112 112 "filter": {
113 113 "exists": {"field": "tags.requests.numeric_values"}
114 114 },
115 115 },
116 116 "sql": {
117 117 "aggs": {
118 118 "sub_agg": {"sum": {"field": "tags.sql.numeric_values"}}
119 119 },
120 120 "filter": {"exists": {"field": "tags.sql.numeric_values"}},
121 121 },
122 122 "tmpl": {
123 123 "aggs": {
124 124 "sub_agg": {
125 125 "sum": {"field": "tags.tmpl.numeric_values"}
126 126 }
127 127 },
128 128 "filter": {"exists": {"field": "tags.tmpl.numeric_values"}},
129 129 },
130 130 },
131 131 "date_histogram": {
132 132 "extended_bounds": {
133 133 "max": filter_settings["end_date"],
134 134 "min": filter_settings["start_date"],
135 135 },
136 136 "field": "timestamp",
137 137 "interval": interval,
138 138 "min_doc_count": 0,
139 139 },
140 140 }
141 141 },
142 142 "query": {
143 143 "bool": {
144 "filter": {
145 "and": [
146 {
147 "terms": {
148 "resource_id": [filter_settings["resource"][0]]
149 }
150 },
151 {
152 "range": {
153 "timestamp": {
154 "gte": filter_settings["start_date"],
155 "lte": filter_settings["end_date"],
156 }
144 "filter": [
145 {
146 "terms": {
147 "resource_id": [filter_settings["resource"][0]]
148 }
149 },
150 {
151 "range": {
152 "timestamp": {
153 "gte": filter_settings["start_date"],
154 "lte": filter_settings["end_date"],
157 155 }
158 },
159 {"terms": {"namespace": ["appenlight.request_metric"]}},
160 ]
161 }
156 }
157 },
158 {"terms": {"namespace": ["appenlight.request_metric"]}},
159 ]
162 160 }
163 161 },
164 162 }
165 163
166 164 index_names = es_index_name_limiter(
167 165 start_date=filter_settings["start_date"],
168 166 end_date=filter_settings["end_date"],
169 167 ixtypes=["metrics"],
170 168 )
171 169 if not index_names:
172 170 return []
173 171
174 172 result = Datastores.es.search(
175 173 body=es_query, index=index_names, doc_type="log", size=0
176 174 )
177 175
178 176 plot_data = []
179 177 for item in result["aggregations"]["parent_agg"]["buckets"]:
180 178 x_time = datetime.utcfromtimestamp(int(item["key"]) / 1000)
181 179 point = {"x": x_time}
182 180 for key in ["custom", "main", "nosql", "remote", "requests", "sql", "tmpl"]:
183 181 value = item[key]["sub_agg"]["value"]
184 182 point[key] = round(value, 3) if value else 0
185 183 plot_data.append(point)
186 184
187 185 return plot_data
188 186
189 187 @classmethod
190 188 def get_requests_breakdown(cls, request, filter_settings, db_session=None):
191 189 db_session = get_db_session(db_session)
192 190
193 191 # fetch total time of all requests in this time range
194 192 index_names = es_index_name_limiter(
195 193 start_date=filter_settings["start_date"],
196 194 end_date=filter_settings["end_date"],
197 195 ixtypes=["metrics"],
198 196 )
199 197
200 198 if index_names and filter_settings["resource"]:
201 199 es_query = {
202 200 "aggs": {
203 201 "main": {
204 202 "aggs": {
205 203 "sub_agg": {"sum": {"field": "tags.main.numeric_values"}}
206 204 },
207 205 "filter": {"exists": {"field": "tags.main.numeric_values"}},
208 206 }
209 207 },
210 208 "query": {
211 209 "bool": {
212 "filter": {
213 "and": [
214 {
215 "terms": {
216 "resource_id": [filter_settings["resource"][0]]
217 }
218 },
219 {
220 "range": {
221 "timestamp": {
222 "gte": filter_settings["start_date"],
223 "lte": filter_settings["end_date"],
224 }
210 "filter": [
211 {
212 "terms": {
213 "resource_id": [filter_settings["resource"][0]]
214 }
215 },
216 {
217 "range": {
218 "timestamp": {
219 "gte": filter_settings["start_date"],
220 "lte": filter_settings["end_date"],
225 221 }
226 },
227 {"terms": {"namespace": ["appenlight.request_metric"]}},
228 ]
229 }
222 }
223 },
224 {"terms": {"namespace": ["appenlight.request_metric"]}},
225 ]
230 226 }
231 227 },
232 228 }
233 229 result = Datastores.es.search(
234 230 body=es_query, index=index_names, doc_type="log", size=0
235 231 )
236 232 total_time_spent = result["aggregations"]["main"]["sub_agg"]["value"]
237 233 else:
238 234 total_time_spent = 0
239 235 script_text = "doc['tags.main.numeric_values'].value / {}".format(
240 236 total_time_spent
241 237 )
242 238
243 239 if index_names and filter_settings["resource"]:
244 240 es_query = {
245 241 "aggs": {
246 242 "parent_agg": {
247 243 "aggs": {
248 244 "main": {
249 245 "aggs": {
250 246 "sub_agg": {
251 247 "sum": {"field": "tags.main.numeric_values"}
252 248 }
253 249 },
254 250 "filter": {
255 251 "exists": {"field": "tags.main.numeric_values"}
256 252 },
257 253 },
258 254 "percentage": {
259 255 "aggs": {
260 256 "sub_agg": {
261 257 "sum": {
262 258 "lang": "expression",
263 259 "script": script_text,
264 260 }
265 261 }
266 262 },
267 263 "filter": {
268 264 "exists": {"field": "tags.main.numeric_values"}
269 265 },
270 266 },
271 267 "requests": {
272 268 "aggs": {
273 269 "sub_agg": {
274 270 "sum": {"field": "tags.requests.numeric_values"}
275 271 }
276 272 },
277 273 "filter": {
278 274 "exists": {"field": "tags.requests.numeric_values"}
279 275 },
280 276 },
281 277 },
282 278 "terms": {
283 279 "field": "tags.view_name.values",
284 280 "order": {"percentage>sub_agg": "desc"},
285 281 "size": 15,
286 282 },
287 283 }
288 284 },
289 285 "query": {
290 286 "bool": {
291 "filter": {
292 "and": [
293 {
294 "terms": {
295 "resource_id": [filter_settings["resource"][0]]
296 }
297 },
298 {
299 "range": {
300 "timestamp": {
301 "gte": filter_settings["start_date"],
302 "lte": filter_settings["end_date"],
303 }
287 "filter": [
288 {
289 "terms": {
290 "resource_id": [filter_settings["resource"][0]]
291 }
292 },
293 {
294 "range": {
295 "timestamp": {
296 "gte": filter_settings["start_date"],
297 "lte": filter_settings["end_date"],
304 298 }
305 },
306 ]
307 }
299 }
300 },
301 ]
308 302 }
309 303 },
310 304 }
311 305 result = Datastores.es.search(
312 306 body=es_query, index=index_names, doc_type="log", size=0
313 307 )
314 308 series = result["aggregations"]["parent_agg"]["buckets"]
315 309 else:
316 310 series = []
317 311
318 312 and_part = [
319 313 {"term": {"resource_id": filter_settings["resource"][0]}},
320 314 {"terms": {"tags.view_name.values": [row["key"] for row in series]}},
321 315 {"term": {"report_type": str(ReportType.slow)}},
322 316 ]
323 317 query = {
324 318 "aggs": {
325 319 "top_reports": {
326 320 "terms": {"field": "tags.view_name.values", "size": len(series)},
327 321 "aggs": {
328 322 "top_calls_hits": {
329 323 "top_hits": {"sort": {"start_time": "desc"}, "size": 5}
330 324 }
331 325 },
332 326 }
333 327 },
334 "query": {"bool": {"filter": {"and": and_part}}},
328 "query": {"bool": {"filter": and_part}},
335 329 }
336 330 details = {}
337 331 index_names = es_index_name_limiter(ixtypes=["reports"])
338 332 if index_names and series:
339 333 result = Datastores.es.search(
340 334 body=query, doc_type="report", size=0, index=index_names
341 335 )
342 336 for bucket in result["aggregations"]["top_reports"]["buckets"]:
343 337 details[bucket["key"]] = []
344 338
345 339 for hit in bucket["top_calls_hits"]["hits"]["hits"]:
346 340 details[bucket["key"]].append(
347 341 {
348 342 "report_id": hit["_source"]["pg_id"],
349 343 "group_id": hit["_source"]["group_id"],
350 344 }
351 345 )
352 346
353 347 results = []
354 348 for row in series:
355 349 result = {
356 350 "key": row["key"],
357 351 "main": row["main"]["sub_agg"]["value"],
358 352 "requests": row["requests"]["sub_agg"]["value"],
359 353 }
360 354 # es can return 'infinity'
361 355 try:
362 356 result["percentage"] = float(row["percentage"]["sub_agg"]["value"])
363 357 except ValueError:
364 358 result["percentage"] = 0
365 359
366 360 result["latest_details"] = details.get(row["key"]) or []
367 361 results.append(result)
368 362
369 363 return results
370 364
371 365 @classmethod
372 366 def get_apdex_stats(cls, request, filter_settings, threshold=1, db_session=None):
373 367 """
374 368 Returns information and calculates APDEX score per server for dashboard
375 369 server information (upper right stats boxes)
376 370 """
377 371 # Apdex t = (Satisfied Count + Tolerated Count / 2) / Total Samples
378 372 db_session = get_db_session(db_session)
379 373 index_names = es_index_name_limiter(
380 374 start_date=filter_settings["start_date"],
381 375 end_date=filter_settings["end_date"],
382 376 ixtypes=["metrics"],
383 377 )
384 378
385 379 requests_series = []
386 380
387 381 if index_names and filter_settings["resource"]:
388 382 es_query = {
389 383 "aggs": {
390 384 "parent_agg": {
391 385 "aggs": {
392 386 "frustrating": {
393 387 "aggs": {
394 388 "sub_agg": {
395 389 "sum": {"field": "tags.requests.numeric_values"}
396 390 }
397 391 },
398 392 "filter": {
399 393 "and": [
400 394 {
401 395 "range": {
402 396 "tags.main.numeric_values": {"gte": "4"}
403 397 }
404 398 },
405 399 {
406 400 "exists": {
407 401 "field": "tags.requests.numeric_values"
408 402 }
409 403 },
410 404 ]
411 405 },
412 406 },
413 407 "main": {
414 408 "aggs": {
415 409 "sub_agg": {
416 410 "sum": {"field": "tags.main.numeric_values"}
417 411 }
418 412 },
419 413 "filter": {
420 414 "exists": {"field": "tags.main.numeric_values"}
421 415 },
422 416 },
423 417 "requests": {
424 418 "aggs": {
425 419 "sub_agg": {
426 420 "sum": {"field": "tags.requests.numeric_values"}
427 421 }
428 422 },
429 423 "filter": {
430 424 "exists": {"field": "tags.requests.numeric_values"}
431 425 },
432 426 },
433 427 "tolerated": {
434 428 "aggs": {
435 429 "sub_agg": {
436 430 "sum": {"field": "tags.requests.numeric_values"}
437 431 }
438 432 },
439 433 "filter": {
440 434 "and": [
441 435 {
442 436 "range": {
443 437 "tags.main.numeric_values": {"gte": "1"}
444 438 }
445 439 },
446 440 {
447 441 "range": {
448 442 "tags.main.numeric_values": {"lt": "4"}
449 443 }
450 444 },
451 445 {
452 446 "exists": {
453 447 "field": "tags.requests.numeric_values"
454 448 }
455 449 },
456 450 ]
457 451 },
458 452 },
459 453 },
460 454 "terms": {"field": "tags.server_name.values", "size": 999999},
461 455 }
462 456 },
463 457 "query": {
464 458 "bool": {
465 "filter": {
466 "and": [
467 {
468 "terms": {
469 "resource_id": [filter_settings["resource"][0]]
470 }
471 },
472 {
473 "range": {
474 "timestamp": {
475 "gte": filter_settings["start_date"],
476 "lte": filter_settings["end_date"],
477 }
459 "filter": [
460 {
461 "terms": {
462 "resource_id": [filter_settings["resource"][0]]
463 }
464 },
465 {
466 "range": {
467 "timestamp": {
468 "gte": filter_settings["start_date"],
469 "lte": filter_settings["end_date"],
478 470 }
479 },
480 {"terms": {"namespace": ["appenlight.request_metric"]}},
481 ]
482 }
471 }
472 },
473 {"terms": {"namespace": ["appenlight.request_metric"]}},
474 ]
483 475 }
484 476 },
485 477 }
486 478
487 479 result = Datastores.es.search(
488 480 body=es_query, index=index_names, doc_type="log", size=0
489 481 )
490 482 for bucket in result["aggregations"]["parent_agg"]["buckets"]:
491 483 requests_series.append(
492 484 {
493 485 "frustrating": bucket["frustrating"]["sub_agg"]["value"],
494 486 "main": bucket["main"]["sub_agg"]["value"],
495 487 "requests": bucket["requests"]["sub_agg"]["value"],
496 488 "tolerated": bucket["tolerated"]["sub_agg"]["value"],
497 489 "key": bucket["key"],
498 490 }
499 491 )
500 492
501 493 since_when = filter_settings["start_date"]
502 494 until = filter_settings["end_date"]
503 495
504 496 # total errors
505 497
506 498 index_names = es_index_name_limiter(
507 499 start_date=filter_settings["start_date"],
508 500 end_date=filter_settings["end_date"],
509 501 ixtypes=["reports"],
510 502 )
511 503
512 504 report_series = []
513 505 if index_names and filter_settings["resource"]:
514 506 report_type = ReportType.key_from_value(ReportType.error)
515 507 es_query = {
516 508 "aggs": {
517 509 "parent_agg": {
518 510 "aggs": {
519 511 "errors": {
520 512 "aggs": {
521 513 "sub_agg": {
522 514 "sum": {
523 515 "field": "tags.occurences.numeric_values"
524 516 }
525 517 }
526 518 },
527 519 "filter": {
528 520 "and": [
529 521 {"terms": {"tags.type.values": [report_type]}},
530 522 {
531 523 "exists": {
532 524 "field": "tags.occurences.numeric_values"
533 525 }
534 526 },
535 527 ]
536 528 },
537 529 }
538 530 },
539 531 "terms": {"field": "tags.server_name.values", "size": 999999},
540 532 }
541 533 },
542 534 "query": {
543 535 "bool": {
544 "filter": {
545 "and": [
546 {
547 "terms": {
548 "resource_id": [filter_settings["resource"][0]]
549 }
550 },
551 {
552 "range": {
553 "timestamp": {
554 "gte": filter_settings["start_date"],
555 "lte": filter_settings["end_date"],
556 }
536 "filter": [
537 {
538 "terms": {
539 "resource_id": [filter_settings["resource"][0]]
540 }
541 },
542 {
543 "range": {
544 "timestamp": {
545 "gte": filter_settings["start_date"],
546 "lte": filter_settings["end_date"],
557 547 }
558 },
559 {"terms": {"namespace": ["appenlight.error"]}},
560 ]
561 }
548 }
549 },
550 {"terms": {"namespace": ["appenlight.error"]}},
551 ]
562 552 }
563 553 },
564 554 }
565 555 result = Datastores.es.search(
566 556 body=es_query, index=index_names, doc_type="log", size=0
567 557 )
568 558 for bucket in result["aggregations"]["parent_agg"]["buckets"]:
569 559 report_series.append(
570 560 {
571 561 "key": bucket["key"],
572 562 "errors": bucket["errors"]["sub_agg"]["value"],
573 563 }
574 564 )
575 565
576 566 stats = {}
577 567 if UptimeMetricService is not None:
578 568 uptime = UptimeMetricService.get_uptime_by_app(
579 569 filter_settings["resource"][0], since_when=since_when, until=until
580 570 )
581 571 else:
582 572 uptime = 0
583 573
584 574 total_seconds = (until - since_when).total_seconds()
585 575
586 576 for stat in requests_series:
587 577 check_key(stat["key"], stats, uptime, total_seconds)
588 578 stats[stat["key"]]["requests"] = int(stat["requests"])
589 579 stats[stat["key"]]["response_time"] = stat["main"]
590 580 stats[stat["key"]]["tolerated_requests"] = stat["tolerated"]
591 581 stats[stat["key"]]["frustrating_requests"] = stat["frustrating"]
592 582 for server in report_series:
593 583 check_key(server["key"], stats, uptime, total_seconds)
594 584 stats[server["key"]]["errors"] = server["errors"]
595 585
596 586 server_stats = list(stats.values())
597 587 for stat in server_stats:
598 588 stat["satisfying_requests"] = (
599 589 stat["requests"]
600 590 - stat["errors"]
601 591 - stat["frustrating_requests"]
602 592 - stat["tolerated_requests"]
603 593 )
604 594 if stat["satisfying_requests"] < 0:
605 595 stat["satisfying_requests"] = 0
606 596
607 597 if stat["requests"]:
608 598 stat["avg_response_time"] = round(
609 599 stat["response_time"] / stat["requests"], 3
610 600 )
611 601 qual_requests = (
612 602 stat["satisfying_requests"] + stat["tolerated_requests"] / 2.0
613 603 )
614 604 stat["apdex"] = round((qual_requests / stat["requests"]) * 100, 2)
615 605 stat["rpm"] = round(stat["requests"] / stat["total_minutes"], 2)
616 606
617 607 return sorted(server_stats, key=lambda x: x["name"])
@@ -1,186 +1,182 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors
4 4 #
5 5 # Licensed under the Apache License, Version 2.0 (the "License");
6 6 # you may not use this file except in compliance with the License.
7 7 # You may obtain a copy of the License at
8 8 #
9 9 # http://www.apache.org/licenses/LICENSE-2.0
10 10 #
11 11 # Unless required by applicable law or agreed to in writing, software
12 12 # distributed under the License is distributed on an "AS IS" BASIS,
13 13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 14 # See the License for the specific language governing permissions and
15 15 # limitations under the License.
16 16
17 17 from appenlight.models import get_db_session, Datastores
18 18 from appenlight.models.report import Report
19 19 from appenlight.models.services.base import BaseService
20 20 from appenlight.lib.utils import es_index_name_limiter
21 21
22 22
23 23 class SlowCallService(BaseService):
24 24 @classmethod
25 25 def get_time_consuming_calls(cls, request, filter_settings, db_session=None):
26 26 db_session = get_db_session(db_session)
27 27 # get slow calls from older partitions too
28 28 index_names = es_index_name_limiter(
29 29 start_date=filter_settings["start_date"],
30 30 end_date=filter_settings["end_date"],
31 31 ixtypes=["slow_calls"],
32 32 )
33 33 if index_names and filter_settings["resource"]:
34 34 # get longest time taking hashes
35 35 es_query = {
36 36 "aggs": {
37 37 "parent_agg": {
38 38 "aggs": {
39 39 "duration": {
40 40 "aggs": {
41 41 "sub_agg": {
42 42 "sum": {"field": "tags.duration.numeric_values"}
43 43 }
44 44 },
45 45 "filter": {
46 46 "exists": {"field": "tags.duration.numeric_values"}
47 47 },
48 48 },
49 49 "total": {
50 50 "aggs": {
51 51 "sub_agg": {
52 52 "value_count": {
53 53 "field": "tags.statement_hash.values"
54 54 }
55 55 }
56 56 },
57 57 "filter": {
58 58 "exists": {"field": "tags.statement_hash.values"}
59 59 },
60 60 },
61 61 },
62 62 "terms": {
63 63 "field": "tags.statement_hash.values",
64 64 "order": {"duration>sub_agg": "desc"},
65 65 "size": 15,
66 66 },
67 67 }
68 68 },
69 69 "query": {
70 70 "bool": {
71 "filter": {
72 "and": [
73 {
74 "terms": {
75 "resource_id": [filter_settings["resource"][0]]
76 }
77 },
78 {
79 "range": {
80 "timestamp": {
81 "gte": filter_settings["start_date"],
82 "lte": filter_settings["end_date"],
83 }
71 "filter": [
72 {
73 "terms": {
74 "resource_id": [filter_settings["resource"][0]]
75 }
76 },
77 {
78 "range": {
79 "timestamp": {
80 "gte": filter_settings["start_date"],
81 "lte": filter_settings["end_date"],
84 82 }
85 },
86 ]
87 }
83 }
84 },
85 ]
88 86 }
89 87 },
90 88 }
91 89 result = Datastores.es.search(
92 90 body=es_query, index=index_names, doc_type="log", size=0
93 91 )
94 92 results = result["aggregations"]["parent_agg"]["buckets"]
95 93 else:
96 94 return []
97 95 hashes = [i["key"] for i in results]
98 96
99 97 # get queries associated with hashes
100 98 calls_query = {
101 99 "aggs": {
102 100 "top_calls": {
103 101 "terms": {"field": "tags.statement_hash.values", "size": 15},
104 102 "aggs": {
105 103 "top_calls_hits": {
106 104 "top_hits": {"sort": {"timestamp": "desc"}, "size": 5}
107 105 }
108 106 },
109 107 }
110 108 },
111 109 "query": {
112 110 "bool": {
113 "filter": {
114 "and": [
115 {
116 "terms": {
117 "resource_id": [filter_settings["resource"][0]]
118 }
119 },
120 {"terms": {"tags.statement_hash.values": hashes}},
121 {
122 "range": {
123 "timestamp": {
124 "gte": filter_settings["start_date"],
125 "lte": filter_settings["end_date"],
126 }
111 "filter": [
112 {
113 "terms": {
114 "resource_id": [filter_settings["resource"][0]]
115 }
116 },
117 {"terms": {"tags.statement_hash.values": hashes}},
118 {
119 "range": {
120 "timestamp": {
121 "gte": filter_settings["start_date"],
122 "lte": filter_settings["end_date"],
127 123 }
128 },
129 ]
130 }
124 }
125 },
126 ]
131 127 }
132 128 },
133 129 }
134 130 calls = Datastores.es.search(
135 131 body=calls_query, index=index_names, doc_type="log", size=0
136 132 )
137 133 call_results = {}
138 134 report_ids = []
139 135 for call in calls["aggregations"]["top_calls"]["buckets"]:
140 136 hits = call["top_calls_hits"]["hits"]["hits"]
141 137 call_results[call["key"]] = [i["_source"] for i in hits]
142 138 report_ids.extend(
143 139 [i["_source"]["tags"]["report_id"]["values"] for i in hits]
144 140 )
145 141 if report_ids:
146 142 r_query = db_session.query(Report.group_id, Report.id)
147 143 r_query = r_query.filter(Report.id.in_(report_ids))
148 144 r_query = r_query.filter(Report.start_time >= filter_settings["start_date"])
149 145 else:
150 146 r_query = []
151 147 reports_reversed = {}
152 148 for report in r_query:
153 149 reports_reversed[report.id] = report.group_id
154 150
155 151 final_results = []
156 152 for item in results:
157 153 if item["key"] not in call_results:
158 154 continue
159 155 call = call_results[item["key"]][0]
160 156 row = {
161 157 "occurences": item["total"]["sub_agg"]["value"],
162 158 "total_duration": round(item["duration"]["sub_agg"]["value"]),
163 159 "statement": call["message"],
164 160 "statement_type": call["tags"]["type"]["values"],
165 161 "statement_subtype": call["tags"]["subtype"]["values"],
166 162 "statement_hash": item["key"],
167 163 "latest_details": [],
168 164 }
169 165 if row["statement_type"] in ["tmpl", " remote"]:
170 166 params = (
171 167 call["tags"]["parameters"]["values"]
172 168 if "parameters" in call["tags"]
173 169 else ""
174 170 )
175 171 row["statement"] = "{} ({})".format(call["message"], params)
176 172 for call in call_results[item["key"]]:
177 173 report_id = call["tags"]["report_id"]["values"]
178 174 group_id = reports_reversed.get(report_id)
179 175 if group_id:
180 176 row["latest_details"].append(
181 177 {"group_id": group_id, "report_id": report_id}
182 178 )
183 179
184 180 final_results.append(row)
185 181
186 182 return final_results
@@ -1,222 +1,220 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors
4 4 #
5 5 # Licensed under the Apache License, Version 2.0 (the "License");
6 6 # you may not use this file except in compliance with the License.
7 7 # You may obtain a copy of the License at
8 8 #
9 9 # http://www.apache.org/licenses/LICENSE-2.0
10 10 #
11 11 # Unless required by applicable law or agreed to in writing, software
12 12 # distributed under the License is distributed on an "AS IS" BASIS,
13 13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 14 # See the License for the specific language governing permissions and
15 15 # limitations under the License.
16 16
17 17 import logging
18 18 from datetime import datetime, timedelta
19 19
20 20 from pyramid.view import view_config
21 21 from pyramid.httpexceptions import HTTPUnprocessableEntity
22 22 from appenlight.models import Datastores, Log
23 23 from appenlight.models.services.log import LogService
24 24 from appenlight.lib.utils import (
25 25 build_filter_settings_from_query_dict,
26 26 es_index_name_limiter,
27 27 )
28 28 from appenlight.lib.helpers import gen_pagination_headers
29 29 from appenlight.celery.tasks import logs_cleanup
30 30
31 31 log = logging.getLogger(__name__)
32 32
33 33 section_filters_key = "appenlight:logs:filter:%s"
34 34
35 35
36 36 @view_config(route_name="logs_no_id", renderer="json", permission="authenticated")
37 37 def fetch_logs(request):
38 38 """
39 39 Returns list of log entries from Elasticsearch
40 40 """
41 41
42 42 filter_settings = build_filter_settings_from_query_dict(
43 43 request, request.GET.mixed()
44 44 )
45 45 logs_paginator = LogService.get_paginator_by_app_ids(
46 46 app_ids=filter_settings["resource"],
47 47 page=filter_settings["page"],
48 48 filter_settings=filter_settings,
49 49 )
50 50 headers = gen_pagination_headers(request, logs_paginator)
51 51 request.response.headers.update(headers)
52 52
53 53 return [l.get_dict() for l in logs_paginator.sa_items]
54 54
55 55
56 56 @view_config(
57 57 route_name="section_view",
58 58 match_param=["section=logs_section", "view=fetch_series"],
59 59 renderer="json",
60 60 permission="authenticated",
61 61 )
62 62 def logs_fetch_series(request):
63 63 """
64 64 Handles metric dashboard graphs
65 65 Returns information for time/tier breakdown
66 66 """
67 67 filter_settings = build_filter_settings_from_query_dict(
68 68 request, request.GET.mixed()
69 69 )
70 70 paginator = LogService.get_paginator_by_app_ids(
71 71 app_ids=filter_settings["resource"],
72 72 page=1,
73 73 filter_settings=filter_settings,
74 74 items_per_page=1,
75 75 )
76 76 now = datetime.utcnow().replace(microsecond=0, second=0)
77 77 delta = timedelta(days=7)
78 78 if paginator.sa_items:
79 79 start_date = paginator.sa_items[-1].timestamp.replace(microsecond=0, second=0)
80 80 filter_settings["start_date"] = start_date - delta
81 81 else:
82 82 filter_settings["start_date"] = now - delta
83 83 filter_settings["end_date"] = filter_settings["start_date"] + timedelta(days=7)
84 84
85 85 @request.registry.cache_regions.redis_sec_30.cache_on_arguments("logs_graphs")
86 86 def cached(apps, search_params, delta, now):
87 87 data = LogService.get_time_series_aggregate(
88 88 filter_settings["resource"], filter_settings
89 89 )
90 90 if not data:
91 91 return []
92 92 buckets = data["aggregations"]["events_over_time"]["buckets"]
93 93 return [
94 94 {
95 95 "x": datetime.utcfromtimestamp(item["key"] / 1000),
96 96 "logs": item["doc_count"],
97 97 }
98 98 for item in buckets
99 99 ]
100 100
101 101 return cached(filter_settings, request.GET.mixed(), delta, now)
102 102
103 103
104 104 @view_config(
105 105 route_name="logs_no_id",
106 106 renderer="json",
107 107 request_method="DELETE",
108 108 permission="authenticated",
109 109 )
110 110 def logs_mass_delete(request):
111 111 params = request.GET.mixed()
112 112 if "resource" not in params:
113 113 raise HTTPUnprocessableEntity()
114 114 # this might be '' and then colander will not validate the schema
115 115 if not params.get("namespace"):
116 116 params.pop("namespace", None)
117 117 filter_settings = build_filter_settings_from_query_dict(
118 118 request, params, resource_permissions=["update_reports"]
119 119 )
120 120
121 121 resource_id = list(filter_settings["resource"])[0]
122 122 # filter settings returns list of all of users applications
123 123 # if app is not matching - normally we would not care as its used for search
124 124 # but here user playing with params would possibly wipe out their whole data
125 125 if int(resource_id) != int(params["resource"]):
126 126 raise HTTPUnprocessableEntity()
127 127
128 128 logs_cleanup.delay(resource_id, filter_settings)
129 129 msg = (
130 130 "Log cleanup process started - it may take a while for "
131 131 "everything to get removed"
132 132 )
133 133 request.session.flash(msg)
134 134 return {}
135 135
136 136
137 137 @view_config(
138 138 route_name="section_view",
139 139 match_param=("view=common_tags", "section=logs_section"),
140 140 renderer="json",
141 141 permission="authenticated",
142 142 )
143 143 def common_tags(request):
144 144 config = request.GET.mixed()
145 145 filter_settings = build_filter_settings_from_query_dict(request, config)
146 146
147 147 resources = list(filter_settings["resource"])
148 148 query = {
149 149 "query": {
150 150 "bool": {
151 "filter": {"and": [{"terms": {"resource_id": list(resources)}}]}
151 "filter": [{"terms": {"resource_id": list(resources)}}]
152 152 }
153 153 }
154 154 }
155 155 start_date = filter_settings.get("start_date")
156 156 end_date = filter_settings.get("end_date")
157 filter_part = query["query"]["bool"]["filter"]["and"]
157 filter_part = query["query"]["bool"]["filter"]
158 158
159 159 date_range = {"range": {"timestamp": {}}}
160 160 if start_date:
161 161 date_range["range"]["timestamp"]["gte"] = start_date
162 162 if end_date:
163 163 date_range["range"]["timestamp"]["lte"] = end_date
164 164 if start_date or end_date:
165 165 filter_part.append(date_range)
166 166
167 167 levels = filter_settings.get("level")
168 168 if levels:
169 169 filter_part.append({"terms": {"log_level": levels}})
170 170 namespaces = filter_settings.get("namespace")
171 171 if namespaces:
172 172 filter_part.append({"terms": {"namespace": namespaces}})
173 173
174 174 query["aggs"] = {"sub_agg": {"terms": {"field": "tag_list", "size": 50}}}
175 175 # tags
176 176 index_names = es_index_name_limiter(ixtypes=[config.get("datasource", "logs")])
177 177 result = Datastores.es.search(body=query, index=index_names, doc_type="log", size=0)
178 178 tag_buckets = result["aggregations"]["sub_agg"].get("buckets", [])
179 179 # namespaces
180 180 query["aggs"] = {"sub_agg": {"terms": {"field": "namespace", "size": 50}}}
181 181 result = Datastores.es.search(body=query, index=index_names, doc_type="log", size=0)
182 182 namespaces_buckets = result["aggregations"]["sub_agg"].get("buckets", [])
183 183 return {
184 184 "tags": [item["key"] for item in tag_buckets],
185 185 "namespaces": [item["key"] for item in namespaces_buckets],
186 186 }
187 187
188 188
189 189 @view_config(
190 190 route_name="section_view",
191 191 match_param=("view=common_values", "section=logs_section"),
192 192 renderer="json",
193 193 permission="authenticated",
194 194 )
195 195 def common_values(request):
196 196 config = request.GET.mixed()
197 197 datasource = config.pop("datasource", "logs")
198 198 filter_settings = build_filter_settings_from_query_dict(request, config)
199 199 resources = list(filter_settings["resource"])
200 200 tag_name = filter_settings["tags"][0]["value"][0]
201 201
202 202 and_part = [
203 203 {"terms": {"resource_id": list(resources)}},
204 204 ]
205 205 if filter_settings["namespace"]:
206 206 and_part.append({"terms": {"namespace": filter_settings["namespace"]}})
207 207 query = {
208 208 "query": {
209 209 "bool": {
210 "filter": {
211 "and": and_part
212 }
210 "filter": and_part
213 211 }
214 212 }
215 213 }
216 214 query["aggs"] = {
217 215 "sub_agg": {"terms": {"field": "tags.{}.values".format(tag_name), "size": 50}}
218 216 }
219 217 index_names = es_index_name_limiter(ixtypes=[datasource])
220 218 result = Datastores.es.search(body=query, index=index_names, doc_type="log", size=0)
221 219 values_buckets = result["aggregations"]["sub_agg"].get("buckets", [])
222 220 return {"values": [item["key"] for item in values_buckets]}
General Comments 0
You need to be logged in to leave comments. Login now