##// END OF EJS Templates
elasticsearch: replace "filtered" with "bool" clause
ergo -
Show More
@@ -1,708 +1,708 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors
4 4 #
5 5 # Licensed under the Apache License, Version 2.0 (the "License");
6 6 # you may not use this file except in compliance with the License.
7 7 # You may obtain a copy of the License at
8 8 #
9 9 # http://www.apache.org/licenses/LICENSE-2.0
10 10 #
11 11 # Unless required by applicable law or agreed to in writing, software
12 12 # distributed under the License is distributed on an "AS IS" BASIS,
13 13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 14 # See the License for the specific language governing permissions and
15 15 # limitations under the License.
16 16
17 17 import bisect
18 18 import collections
19 19 import math
20 20 from datetime import datetime, timedelta
21 21
22 22 import sqlalchemy as sa
23 23 import elasticsearch.exceptions
24 24 import elasticsearch.helpers
25 25
26 26 from celery.utils.log import get_task_logger
27 27 from zope.sqlalchemy import mark_changed
28 28 from pyramid.threadlocal import get_current_request, get_current_registry
29 29 from ziggurat_foundations.models.services.resource import ResourceService
30 30
31 31 from appenlight.celery import celery
32 32 from appenlight.models.report_group import ReportGroup
33 33 from appenlight.models import DBSession, Datastores
34 34 from appenlight.models.report import Report
35 35 from appenlight.models.log import Log
36 36 from appenlight.models.metric import Metric
37 37 from appenlight.models.event import Event
38 38
39 39 from appenlight.models.services.application import ApplicationService
40 40 from appenlight.models.services.event import EventService
41 41 from appenlight.models.services.log import LogService
42 42 from appenlight.models.services.report import ReportService
43 43 from appenlight.models.services.report_group import ReportGroupService
44 44 from appenlight.models.services.user import UserService
45 45 from appenlight.models.tag import Tag
46 46 from appenlight.lib import print_traceback
47 47 from appenlight.lib.utils import parse_proto, in_batches
48 48 from appenlight.lib.ext_json import json
49 49 from appenlight.lib.redis_keys import REDIS_KEYS
50 50 from appenlight.lib.enums import ReportType
51 51
52 52 log = get_task_logger(__name__)
53 53
54 54 sample_boundries = (
55 55 list(range(100, 1000, 100))
56 56 + list(range(1000, 10000, 1000))
57 57 + list(range(10000, 100000, 5000))
58 58 )
59 59
60 60
61 61 def pick_sample(total_occurences, report_type=None):
62 62 every = 1.0
63 63 position = bisect.bisect_left(sample_boundries, total_occurences)
64 64 if position > 0:
65 65 if report_type == ReportType.not_found:
66 66 divide = 10.0
67 67 else:
68 68 divide = 100.0
69 69 every = sample_boundries[position - 1] / divide
70 70 return total_occurences % every == 0
71 71
72 72
73 73 @celery.task(queue="default", default_retry_delay=1, max_retries=2)
74 74 def test_exception_task():
75 75 log.error("test celery log", extra={"location": "celery"})
76 76 log.warning("test celery log", extra={"location": "celery"})
77 77 raise Exception("Celery exception test")
78 78
79 79
80 80 @celery.task(queue="default", default_retry_delay=1, max_retries=2)
81 81 def test_retry_exception_task():
82 82 try:
83 83 import time
84 84
85 85 time.sleep(1.3)
86 86 log.error("test retry celery log", extra={"location": "celery"})
87 87 log.warning("test retry celery log", extra={"location": "celery"})
88 88 raise Exception("Celery exception test")
89 89 except Exception as exc:
90 90 if celery.conf["CELERY_EAGER_PROPAGATES_EXCEPTIONS"]:
91 91 raise
92 92 test_retry_exception_task.retry(exc=exc)
93 93
94 94
95 95 @celery.task(queue="reports", default_retry_delay=600, max_retries=144)
96 96 def add_reports(resource_id, request_params, dataset, **kwargs):
97 97 proto_version = parse_proto(request_params.get("protocol_version", ""))
98 98 current_time = datetime.utcnow().replace(second=0, microsecond=0)
99 99 try:
100 100 # we will store solr docs here for single insert
101 101 es_report_docs = {}
102 102 es_report_group_docs = {}
103 103 resource = ApplicationService.by_id(resource_id)
104 104
105 105 tags = []
106 106 es_slow_calls_docs = {}
107 107 es_reports_stats_rows = {}
108 108 for report_data in dataset:
109 109 # build report details for later
110 110 added_details = 0
111 111 report = Report()
112 112 report.set_data(report_data, resource, proto_version)
113 113 report._skip_ft_index = True
114 114
115 115 # find latest group in this months partition
116 116 report_group = ReportGroupService.by_hash_and_resource(
117 117 report.resource_id,
118 118 report.grouping_hash,
119 119 since_when=datetime.utcnow().date().replace(day=1),
120 120 )
121 121 occurences = report_data.get("occurences", 1)
122 122 if not report_group:
123 123 # total reports will be +1 moment later
124 124 report_group = ReportGroup(
125 125 grouping_hash=report.grouping_hash,
126 126 occurences=0,
127 127 total_reports=0,
128 128 last_report=0,
129 129 priority=report.priority,
130 130 error=report.error,
131 131 first_timestamp=report.start_time,
132 132 )
133 133 report_group._skip_ft_index = True
134 134 report_group.report_type = report.report_type
135 135 report.report_group_time = report_group.first_timestamp
136 136 add_sample = pick_sample(
137 137 report_group.occurences, report_type=report_group.report_type
138 138 )
139 139 if add_sample:
140 140 resource.report_groups.append(report_group)
141 141 report_group.reports.append(report)
142 142 added_details += 1
143 143 DBSession.flush()
144 144 if report.partition_id not in es_report_docs:
145 145 es_report_docs[report.partition_id] = []
146 146 es_report_docs[report.partition_id].append(report.es_doc())
147 147 tags.extend(list(report.tags.items()))
148 148 slow_calls = report.add_slow_calls(report_data, report_group)
149 149 DBSession.flush()
150 150 for s_call in slow_calls:
151 151 if s_call.partition_id not in es_slow_calls_docs:
152 152 es_slow_calls_docs[s_call.partition_id] = []
153 153 es_slow_calls_docs[s_call.partition_id].append(s_call.es_doc())
154 154 # try generating new stat rows if needed
155 155 else:
156 156 # required for postprocessing to not fail later
157 157 report.report_group = report_group
158 158
159 159 stat_row = ReportService.generate_stat_rows(report, resource, report_group)
160 160 if stat_row.partition_id not in es_reports_stats_rows:
161 161 es_reports_stats_rows[stat_row.partition_id] = []
162 162 es_reports_stats_rows[stat_row.partition_id].append(stat_row.es_doc())
163 163
164 164 # see if we should mark 10th occurence of report
165 165 last_occurences_10 = int(math.floor(report_group.occurences / 10))
166 166 curr_occurences_10 = int(
167 167 math.floor((report_group.occurences + report.occurences) / 10)
168 168 )
169 169 last_occurences_100 = int(math.floor(report_group.occurences / 100))
170 170 curr_occurences_100 = int(
171 171 math.floor((report_group.occurences + report.occurences) / 100)
172 172 )
173 173 notify_occurences_10 = last_occurences_10 != curr_occurences_10
174 174 notify_occurences_100 = last_occurences_100 != curr_occurences_100
175 175 report_group.occurences = ReportGroup.occurences + occurences
176 176 report_group.last_timestamp = report.start_time
177 177 report_group.summed_duration = ReportGroup.summed_duration + report.duration
178 178 summed_duration = ReportGroup.summed_duration + report.duration
179 179 summed_occurences = ReportGroup.occurences + occurences
180 180 report_group.average_duration = summed_duration / summed_occurences
181 181 report_group.run_postprocessing(report)
182 182 if added_details:
183 183 report_group.total_reports = ReportGroup.total_reports + 1
184 184 report_group.last_report = report.id
185 185 report_group.set_notification_info(
186 186 notify_10=notify_occurences_10, notify_100=notify_occurences_100
187 187 )
188 188 DBSession.flush()
189 189 report_group.get_report().notify_channel(report_group)
190 190 if report_group.partition_id not in es_report_group_docs:
191 191 es_report_group_docs[report_group.partition_id] = []
192 192 es_report_group_docs[report_group.partition_id].append(
193 193 report_group.es_doc()
194 194 )
195 195
196 196 action = "REPORT"
197 197 log_msg = "%s: %s %s, client: %s, proto: %s" % (
198 198 action,
199 199 report_data.get("http_status", "unknown"),
200 200 str(resource),
201 201 report_data.get("client"),
202 202 proto_version,
203 203 )
204 204 log.info(log_msg)
205 205 total_reports = len(dataset)
206 206 redis_pipeline = Datastores.redis.pipeline(transaction=False)
207 207 key = REDIS_KEYS["counters"]["reports_per_minute"].format(current_time)
208 208 redis_pipeline.incr(key, total_reports)
209 209 redis_pipeline.expire(key, 3600 * 24)
210 210 key = REDIS_KEYS["counters"]["events_per_minute_per_user"].format(
211 211 resource.owner_user_id, current_time
212 212 )
213 213 redis_pipeline.incr(key, total_reports)
214 214 redis_pipeline.expire(key, 3600)
215 215 key = REDIS_KEYS["counters"]["reports_per_hour_per_app"].format(
216 216 resource_id, current_time.replace(minute=0)
217 217 )
218 218 redis_pipeline.incr(key, total_reports)
219 219 redis_pipeline.expire(key, 3600 * 24 * 7)
220 220 redis_pipeline.sadd(
221 221 REDIS_KEYS["apps_that_got_new_data_per_hour"].format(
222 222 current_time.replace(minute=0)
223 223 ),
224 224 resource_id,
225 225 )
226 226 redis_pipeline.execute()
227 227
228 228 add_reports_es(es_report_group_docs, es_report_docs)
229 229 add_reports_slow_calls_es(es_slow_calls_docs)
230 230 add_reports_stats_rows_es(es_reports_stats_rows)
231 231 return True
232 232 except Exception as exc:
233 233 print_traceback(log)
234 234 if celery.conf["CELERY_EAGER_PROPAGATES_EXCEPTIONS"]:
235 235 raise
236 236 add_reports.retry(exc=exc)
237 237
238 238
239 239 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
240 240 def add_reports_es(report_group_docs, report_docs):
241 241 for k, v in report_group_docs.items():
242 242 to_update = {"_index": k, "_type": "report_group"}
243 243 [i.update(to_update) for i in v]
244 244 elasticsearch.helpers.bulk(Datastores.es, v)
245 245 for k, v in report_docs.items():
246 246 to_update = {"_index": k, "_type": "report"}
247 247 [i.update(to_update) for i in v]
248 248 elasticsearch.helpers.bulk(Datastores.es, v)
249 249
250 250
251 251 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
252 252 def add_reports_slow_calls_es(es_docs):
253 253 for k, v in es_docs.items():
254 254 to_update = {"_index": k, "_type": "log"}
255 255 [i.update(to_update) for i in v]
256 256 elasticsearch.helpers.bulk(Datastores.es, v)
257 257
258 258
259 259 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
260 260 def add_reports_stats_rows_es(es_docs):
261 261 for k, v in es_docs.items():
262 262 to_update = {"_index": k, "_type": "log"}
263 263 [i.update(to_update) for i in v]
264 264 elasticsearch.helpers.bulk(Datastores.es, v)
265 265
266 266
267 267 @celery.task(queue="logs", default_retry_delay=600, max_retries=144)
268 268 def add_logs(resource_id, request_params, dataset, **kwargs):
269 269 proto_version = request_params.get("protocol_version")
270 270 current_time = datetime.utcnow().replace(second=0, microsecond=0)
271 271
272 272 try:
273 273 es_docs = collections.defaultdict(list)
274 274 resource = ApplicationService.by_id_cached()(resource_id)
275 275 resource = DBSession.merge(resource, load=False)
276 276 ns_pairs = []
277 277 for entry in dataset:
278 278 # gather pk and ns so we can remove older versions of row later
279 279 if entry["primary_key"] is not None:
280 280 ns_pairs.append({"pk": entry["primary_key"], "ns": entry["namespace"]})
281 281 log_entry = Log()
282 282 log_entry.set_data(entry, resource=resource)
283 283 log_entry._skip_ft_index = True
284 284 resource.logs.append(log_entry)
285 285 DBSession.flush()
286 286 # insert non pk rows first
287 287 if entry["primary_key"] is None:
288 288 es_docs[log_entry.partition_id].append(log_entry.es_doc())
289 289
290 290 # 2nd pass to delete all log entries from db foe same pk/ns pair
291 291 if ns_pairs:
292 292 ids_to_delete = []
293 293 es_docs = collections.defaultdict(list)
294 294 es_docs_to_delete = collections.defaultdict(list)
295 295 found_pkey_logs = LogService.query_by_primary_key_and_namespace(
296 296 list_of_pairs=ns_pairs
297 297 )
298 298 log_dict = {}
299 299 for log_entry in found_pkey_logs:
300 300 log_key = (log_entry.primary_key, log_entry.namespace)
301 301 if log_key not in log_dict:
302 302 log_dict[log_key] = []
303 303 log_dict[log_key].append(log_entry)
304 304
305 305 for ns, entry_list in log_dict.items():
306 306 entry_list = sorted(entry_list, key=lambda x: x.timestamp)
307 307 # newest row needs to be indexed in es
308 308 log_entry = entry_list[-1]
309 309 # delete everything from pg and ES, leave the last row in pg
310 310 for e in entry_list[:-1]:
311 311 ids_to_delete.append(e.log_id)
312 312 es_docs_to_delete[e.partition_id].append(e.delete_hash)
313 313
314 314 es_docs_to_delete[log_entry.partition_id].append(log_entry.delete_hash)
315 315
316 316 es_docs[log_entry.partition_id].append(log_entry.es_doc())
317 317
318 318 if ids_to_delete:
319 319 query = DBSession.query(Log).filter(Log.log_id.in_(ids_to_delete))
320 320 query.delete(synchronize_session=False)
321 321 if es_docs_to_delete:
322 322 # batch this to avoid problems with default ES bulk limits
323 323 for es_index in es_docs_to_delete.keys():
324 324 for batch in in_batches(es_docs_to_delete[es_index], 20):
325 325 query = {"query": {"terms": {"delete_hash": batch}}}
326 326
327 327 try:
328 328 Datastores.es.transport.perform_request(
329 329 "DELETE",
330 330 "/{}/{}/_query".format(es_index, "log"),
331 331 body=query,
332 332 )
333 333 except elasticsearch.exceptions.NotFoundError as exc:
334 334 msg = "skipping index {}".format(es_index)
335 335 log.info(msg)
336 336
337 337 total_logs = len(dataset)
338 338
339 339 log_msg = "LOG_NEW: %s, entries: %s, proto:%s" % (
340 340 str(resource),
341 341 total_logs,
342 342 proto_version,
343 343 )
344 344 log.info(log_msg)
345 345 # mark_changed(session)
346 346 redis_pipeline = Datastores.redis.pipeline(transaction=False)
347 347 key = REDIS_KEYS["counters"]["logs_per_minute"].format(current_time)
348 348 redis_pipeline.incr(key, total_logs)
349 349 redis_pipeline.expire(key, 3600 * 24)
350 350 key = REDIS_KEYS["counters"]["events_per_minute_per_user"].format(
351 351 resource.owner_user_id, current_time
352 352 )
353 353 redis_pipeline.incr(key, total_logs)
354 354 redis_pipeline.expire(key, 3600)
355 355 key = REDIS_KEYS["counters"]["logs_per_hour_per_app"].format(
356 356 resource_id, current_time.replace(minute=0)
357 357 )
358 358 redis_pipeline.incr(key, total_logs)
359 359 redis_pipeline.expire(key, 3600 * 24 * 7)
360 360 redis_pipeline.sadd(
361 361 REDIS_KEYS["apps_that_got_new_data_per_hour"].format(
362 362 current_time.replace(minute=0)
363 363 ),
364 364 resource_id,
365 365 )
366 366 redis_pipeline.execute()
367 367 add_logs_es(es_docs)
368 368 return True
369 369 except Exception as exc:
370 370 print_traceback(log)
371 371 if celery.conf["CELERY_EAGER_PROPAGATES_EXCEPTIONS"]:
372 372 raise
373 373 add_logs.retry(exc=exc)
374 374
375 375
376 376 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
377 377 def add_logs_es(es_docs):
378 378 for k, v in es_docs.items():
379 379 to_update = {"_index": k, "_type": "log"}
380 380 [i.update(to_update) for i in v]
381 381 elasticsearch.helpers.bulk(Datastores.es, v)
382 382
383 383
384 384 @celery.task(queue="metrics", default_retry_delay=600, max_retries=144)
385 385 def add_metrics(resource_id, request_params, dataset, proto_version):
386 386 current_time = datetime.utcnow().replace(second=0, microsecond=0)
387 387 try:
388 388 resource = ApplicationService.by_id_cached()(resource_id)
389 389 resource = DBSession.merge(resource, load=False)
390 390 es_docs = []
391 391 rows = []
392 392 for metric in dataset:
393 393 tags = dict(metric["tags"])
394 394 server_n = tags.get("server_name", metric["server_name"]).lower()
395 395 tags["server_name"] = server_n or "unknown"
396 396 new_metric = Metric(
397 397 timestamp=metric["timestamp"],
398 398 resource_id=resource.resource_id,
399 399 namespace=metric["namespace"],
400 400 tags=tags,
401 401 )
402 402 rows.append(new_metric)
403 403 es_docs.append(new_metric.es_doc())
404 404 session = DBSession()
405 405 session.bulk_save_objects(rows)
406 406 session.flush()
407 407
408 408 action = "METRICS"
409 409 metrics_msg = "%s: %s, metrics: %s, proto:%s" % (
410 410 action,
411 411 str(resource),
412 412 len(dataset),
413 413 proto_version,
414 414 )
415 415 log.info(metrics_msg)
416 416
417 417 mark_changed(session)
418 418 redis_pipeline = Datastores.redis.pipeline(transaction=False)
419 419 key = REDIS_KEYS["counters"]["metrics_per_minute"].format(current_time)
420 420 redis_pipeline.incr(key, len(rows))
421 421 redis_pipeline.expire(key, 3600 * 24)
422 422 key = REDIS_KEYS["counters"]["events_per_minute_per_user"].format(
423 423 resource.owner_user_id, current_time
424 424 )
425 425 redis_pipeline.incr(key, len(rows))
426 426 redis_pipeline.expire(key, 3600)
427 427 key = REDIS_KEYS["counters"]["metrics_per_hour_per_app"].format(
428 428 resource_id, current_time.replace(minute=0)
429 429 )
430 430 redis_pipeline.incr(key, len(rows))
431 431 redis_pipeline.expire(key, 3600 * 24 * 7)
432 432 redis_pipeline.sadd(
433 433 REDIS_KEYS["apps_that_got_new_data_per_hour"].format(
434 434 current_time.replace(minute=0)
435 435 ),
436 436 resource_id,
437 437 )
438 438 redis_pipeline.execute()
439 439 add_metrics_es(es_docs)
440 440 return True
441 441 except Exception as exc:
442 442 print_traceback(log)
443 443 if celery.conf["CELERY_EAGER_PROPAGATES_EXCEPTIONS"]:
444 444 raise
445 445 add_metrics.retry(exc=exc)
446 446
447 447
448 448 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
449 449 def add_metrics_es(es_docs):
450 450 for doc in es_docs:
451 451 partition = "rcae_m_%s" % doc["timestamp"].strftime("%Y_%m_%d")
452 452 Datastores.es.index(partition, "log", doc)
453 453
454 454
455 455 @celery.task(queue="default", default_retry_delay=5, max_retries=2)
456 456 def check_user_report_notifications(resource_id):
457 457 since_when = datetime.utcnow()
458 458 try:
459 459 request = get_current_request()
460 460 application = ApplicationService.by_id(resource_id)
461 461 if not application:
462 462 return
463 463 error_key = REDIS_KEYS["reports_to_notify_per_type_per_app"].format(
464 464 ReportType.error, resource_id
465 465 )
466 466 slow_key = REDIS_KEYS["reports_to_notify_per_type_per_app"].format(
467 467 ReportType.slow, resource_id
468 468 )
469 469 error_group_ids = Datastores.redis.smembers(error_key)
470 470 slow_group_ids = Datastores.redis.smembers(slow_key)
471 471 Datastores.redis.delete(error_key)
472 472 Datastores.redis.delete(slow_key)
473 473 err_gids = [int(g_id) for g_id in error_group_ids]
474 474 slow_gids = [int(g_id) for g_id in list(slow_group_ids)]
475 475 group_ids = err_gids + slow_gids
476 476 occurence_dict = {}
477 477 for g_id in group_ids:
478 478 key = REDIS_KEYS["counters"]["report_group_occurences"].format(g_id)
479 479 val = Datastores.redis.get(key)
480 480 Datastores.redis.delete(key)
481 481 if val:
482 482 occurence_dict[g_id] = int(val)
483 483 else:
484 484 occurence_dict[g_id] = 1
485 485 report_groups = ReportGroupService.by_ids(group_ids)
486 486 report_groups.options(sa.orm.joinedload(ReportGroup.last_report_ref))
487 487
488 488 ApplicationService.check_for_groups_alert(
489 489 application,
490 490 "alert",
491 491 report_groups=report_groups,
492 492 occurence_dict=occurence_dict,
493 493 )
494 494 users = set(
495 495 [p.user for p in ResourceService.users_for_perm(application, "view")]
496 496 )
497 497 report_groups = report_groups.all()
498 498 for user in users:
499 499 UserService.report_notify(
500 500 user,
501 501 request,
502 502 application,
503 503 report_groups=report_groups,
504 504 occurence_dict=occurence_dict,
505 505 )
506 506 for group in report_groups:
507 507 # marks report_groups as notified
508 508 if not group.notified:
509 509 group.notified = True
510 510 except Exception as exc:
511 511 print_traceback(log)
512 512 raise
513 513
514 514
515 515 @celery.task(queue="default", default_retry_delay=5, max_retries=2)
516 516 def check_alerts(resource_id):
517 517 since_when = datetime.utcnow()
518 518 try:
519 519 request = get_current_request()
520 520 application = ApplicationService.by_id(resource_id)
521 521 if not application:
522 522 return
523 523 error_key = REDIS_KEYS["reports_to_notify_per_type_per_app_alerting"].format(
524 524 ReportType.error, resource_id
525 525 )
526 526 slow_key = REDIS_KEYS["reports_to_notify_per_type_per_app_alerting"].format(
527 527 ReportType.slow, resource_id
528 528 )
529 529 error_group_ids = Datastores.redis.smembers(error_key)
530 530 slow_group_ids = Datastores.redis.smembers(slow_key)
531 531 Datastores.redis.delete(error_key)
532 532 Datastores.redis.delete(slow_key)
533 533 err_gids = [int(g_id) for g_id in error_group_ids]
534 534 slow_gids = [int(g_id) for g_id in list(slow_group_ids)]
535 535 group_ids = err_gids + slow_gids
536 536 occurence_dict = {}
537 537 for g_id in group_ids:
538 538 key = REDIS_KEYS["counters"]["report_group_occurences_alerting"].format(
539 539 g_id
540 540 )
541 541 val = Datastores.redis.get(key)
542 542 Datastores.redis.delete(key)
543 543 if val:
544 544 occurence_dict[g_id] = int(val)
545 545 else:
546 546 occurence_dict[g_id] = 1
547 547 report_groups = ReportGroupService.by_ids(group_ids)
548 548 report_groups.options(sa.orm.joinedload(ReportGroup.last_report_ref))
549 549
550 550 ApplicationService.check_for_groups_alert(
551 551 application,
552 552 "alert",
553 553 report_groups=report_groups,
554 554 occurence_dict=occurence_dict,
555 555 since_when=since_when,
556 556 )
557 557 except Exception as exc:
558 558 print_traceback(log)
559 559 raise
560 560
561 561
562 562 @celery.task(queue="default", default_retry_delay=1, max_retries=2)
563 563 def close_alerts():
564 564 log.warning("Checking alerts")
565 565 since_when = datetime.utcnow()
566 566 try:
567 567 event_types = [
568 568 Event.types["error_report_alert"],
569 569 Event.types["slow_report_alert"],
570 570 ]
571 571 statuses = [Event.statuses["active"]]
572 572 # get events older than 5 min
573 573 events = EventService.by_type_and_status(
574 574 event_types, statuses, older_than=(since_when - timedelta(minutes=5))
575 575 )
576 576 for event in events:
577 577 # see if we can close them
578 578 event.validate_or_close(since_when=(since_when - timedelta(minutes=1)))
579 579 except Exception as exc:
580 580 print_traceback(log)
581 581 raise
582 582
583 583
584 584 @celery.task(queue="default", default_retry_delay=600, max_retries=144)
585 585 def update_tag_counter(tag_name, tag_value, count):
586 586 try:
587 587 query = (
588 588 DBSession.query(Tag)
589 589 .filter(Tag.name == tag_name)
590 590 .filter(
591 591 sa.cast(Tag.value, sa.types.TEXT)
592 592 == sa.cast(json.dumps(tag_value), sa.types.TEXT)
593 593 )
594 594 )
595 595 query.update(
596 596 {"times_seen": Tag.times_seen + count, "last_timestamp": datetime.utcnow()},
597 597 synchronize_session=False,
598 598 )
599 599 session = DBSession()
600 600 mark_changed(session)
601 601 return True
602 602 except Exception as exc:
603 603 print_traceback(log)
604 604 if celery.conf["CELERY_EAGER_PROPAGATES_EXCEPTIONS"]:
605 605 raise
606 606 update_tag_counter.retry(exc=exc)
607 607
608 608
609 609 @celery.task(queue="default")
610 610 def update_tag_counters():
611 611 """
612 612 Sets task to update counters for application tags
613 613 """
614 614 tags = Datastores.redis.lrange(REDIS_KEYS["seen_tag_list"], 0, -1)
615 615 Datastores.redis.delete(REDIS_KEYS["seen_tag_list"])
616 616 c = collections.Counter(tags)
617 617 for t_json, count in c.items():
618 618 tag_info = json.loads(t_json)
619 619 update_tag_counter.delay(tag_info[0], tag_info[1], count)
620 620
621 621
622 622 @celery.task(queue="default")
623 623 def daily_digest():
624 624 """
625 625 Sends daily digest with top 50 error reports
626 626 """
627 627 request = get_current_request()
628 628 apps = Datastores.redis.smembers(REDIS_KEYS["apps_that_had_reports"])
629 629 Datastores.redis.delete(REDIS_KEYS["apps_that_had_reports"])
630 630 since_when = datetime.utcnow() - timedelta(hours=8)
631 631 log.warning("Generating daily digests")
632 632 for resource_id in apps:
633 633 resource_id = resource_id.decode("utf8")
634 634 end_date = datetime.utcnow().replace(microsecond=0, second=0)
635 635 filter_settings = {
636 636 "resource": [resource_id],
637 637 "tags": [{"name": "type", "value": ["error"], "op": None}],
638 638 "type": "error",
639 639 "start_date": since_when,
640 640 "end_date": end_date,
641 641 }
642 642
643 643 reports = ReportGroupService.get_trending(
644 644 request, filter_settings=filter_settings, limit=50
645 645 )
646 646
647 647 application = ApplicationService.by_id(resource_id)
648 648 if application:
649 649 users = set(
650 650 [p.user for p in ResourceService.users_for_perm(application, "view")]
651 651 )
652 652 for user in users:
653 653 user.send_digest(
654 654 request, application, reports=reports, since_when=since_when
655 655 )
656 656
657 657
658 658 @celery.task(queue="default")
659 659 def notifications_reports():
660 660 """
661 661 Loop that checks redis for info and then issues new tasks to celery to
662 662 issue notifications
663 663 """
664 664 apps = Datastores.redis.smembers(REDIS_KEYS["apps_that_had_reports"])
665 665 Datastores.redis.delete(REDIS_KEYS["apps_that_had_reports"])
666 666 for app in apps:
667 667 log.warning("Notify for app: %s" % app)
668 668 check_user_report_notifications.delay(app.decode("utf8"))
669 669
670 670
671 671 @celery.task(queue="default")
672 672 def alerting_reports():
673 673 """
674 674 Loop that checks redis for info and then issues new tasks to celery to
675 675 perform the following:
676 676 - which applications should have new alerts opened
677 677 """
678 678
679 679 apps = Datastores.redis.smembers(REDIS_KEYS["apps_that_had_reports_alerting"])
680 680 Datastores.redis.delete(REDIS_KEYS["apps_that_had_reports_alerting"])
681 681 for app in apps:
682 682 log.warning("Notify for app: %s" % app)
683 683 check_alerts.delay(app.decode("utf8"))
684 684
685 685
686 686 @celery.task(
687 687 queue="default", soft_time_limit=3600 * 4, hard_time_limit=3600 * 4, max_retries=144
688 688 )
689 689 def logs_cleanup(resource_id, filter_settings):
690 690 request = get_current_request()
691 691 request.tm.begin()
692 692 es_query = {
693 693 "query": {
694 "filtered": {"filter": {"and": [{"term": {"resource_id": resource_id}}]}}
694 "bool": {"filter": {"and": [{"term": {"resource_id": resource_id}}]}}
695 695 }
696 696 }
697 697
698 698 query = DBSession.query(Log).filter(Log.resource_id == resource_id)
699 699 if filter_settings["namespace"]:
700 700 query = query.filter(Log.namespace == filter_settings["namespace"][0])
701 es_query["query"]["filtered"]["filter"]["and"].append(
701 es_query["query"]["bool"]["filter"]["and"].append(
702 702 {"term": {"namespace": filter_settings["namespace"][0]}}
703 703 )
704 704 query.delete(synchronize_session=False)
705 705 request.tm.commit()
706 706 Datastores.es.transport.perform_request(
707 707 "DELETE", "/{}/{}/_query".format("rcae_l_*", "log"), body=es_query
708 708 )
@@ -1,533 +1,533 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors
4 4 #
5 5 # Licensed under the Apache License, Version 2.0 (the "License");
6 6 # you may not use this file except in compliance with the License.
7 7 # You may obtain a copy of the License at
8 8 #
9 9 # http://www.apache.org/licenses/LICENSE-2.0
10 10 #
11 11 # Unless required by applicable law or agreed to in writing, software
12 12 # distributed under the License is distributed on an "AS IS" BASIS,
13 13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 14 # See the License for the specific language governing permissions and
15 15 # limitations under the License.
16 16
17 17 from datetime import datetime, timedelta
18 18 import math
19 19 import uuid
20 20 import hashlib
21 21 import copy
22 22 import urllib.parse
23 23 import logging
24 24 import sqlalchemy as sa
25 25
26 26 from appenlight.models import Base, Datastores
27 27 from appenlight.lib.utils.date_utils import convert_date
28 28 from appenlight.lib.utils import convert_es_type
29 29 from appenlight.models.slow_call import SlowCall
30 30 from appenlight.lib.utils import channelstream_request
31 31 from appenlight.lib.enums import ReportType, Language
32 32 from pyramid.threadlocal import get_current_registry, get_current_request
33 33 from sqlalchemy.dialects.postgresql import JSON
34 34 from ziggurat_foundations.models.base import BaseModel
35 35
36 36 log = logging.getLogger(__name__)
37 37
38 38 REPORT_TYPE_MATRIX = {
39 39 "http_status": {"type": "int", "ops": ("eq", "ne", "ge", "le")},
40 40 "group:priority": {"type": "int", "ops": ("eq", "ne", "ge", "le")},
41 41 "duration": {"type": "float", "ops": ("ge", "le")},
42 42 "url_domain": {
43 43 "type": "unicode",
44 44 "ops": ("eq", "ne", "startswith", "endswith", "contains"),
45 45 },
46 46 "url_path": {
47 47 "type": "unicode",
48 48 "ops": ("eq", "ne", "startswith", "endswith", "contains"),
49 49 },
50 50 "error": {
51 51 "type": "unicode",
52 52 "ops": ("eq", "ne", "startswith", "endswith", "contains"),
53 53 },
54 54 "tags:server_name": {
55 55 "type": "unicode",
56 56 "ops": ("eq", "ne", "startswith", "endswith", "contains"),
57 57 },
58 58 "traceback": {"type": "unicode", "ops": ("contains",)},
59 59 "group:occurences": {"type": "int", "ops": ("eq", "ne", "ge", "le")},
60 60 }
61 61
62 62
63 63 class Report(Base, BaseModel):
64 64 __tablename__ = "reports"
65 65 __table_args__ = {"implicit_returning": False}
66 66
67 67 id = sa.Column(sa.Integer, nullable=False, primary_key=True)
68 68 group_id = sa.Column(
69 69 sa.BigInteger,
70 70 sa.ForeignKey("reports_groups.id", ondelete="cascade", onupdate="cascade"),
71 71 )
72 72 resource_id = sa.Column(sa.Integer(), nullable=False, index=True)
73 73 report_type = sa.Column(sa.Integer(), nullable=False, index=True)
74 74 error = sa.Column(sa.UnicodeText(), index=True)
75 75 extra = sa.Column(JSON(), default={})
76 76 request = sa.Column(JSON(), nullable=False, default={})
77 77 ip = sa.Column(sa.String(39), index=True, default="")
78 78 username = sa.Column(sa.Unicode(255), default="")
79 79 user_agent = sa.Column(sa.Unicode(255), default="")
80 80 url = sa.Column(sa.UnicodeText(), index=True)
81 81 request_id = sa.Column(sa.Text())
82 82 request_stats = sa.Column(JSON(), nullable=False, default={})
83 83 traceback = sa.Column(JSON(), nullable=False, default=None)
84 84 traceback_hash = sa.Column(sa.Text())
85 85 start_time = sa.Column(
86 86 sa.DateTime(), default=datetime.utcnow, server_default=sa.func.now()
87 87 )
88 88 end_time = sa.Column(sa.DateTime())
89 89 duration = sa.Column(sa.Float, default=0)
90 90 http_status = sa.Column(sa.Integer, index=True)
91 91 url_domain = sa.Column(sa.Unicode(100), index=True)
92 92 url_path = sa.Column(sa.Unicode(255), index=True)
93 93 tags = sa.Column(JSON(), nullable=False, default={})
94 94 language = sa.Column(sa.Integer(), default=0)
95 95 # this is used to determine partition for the report
96 96 report_group_time = sa.Column(
97 97 sa.DateTime(), default=datetime.utcnow, server_default=sa.func.now()
98 98 )
99 99
100 100 logs = sa.orm.relationship(
101 101 "Log",
102 102 lazy="dynamic",
103 103 passive_deletes=True,
104 104 passive_updates=True,
105 105 primaryjoin="and_(Report.request_id==Log.request_id, "
106 106 "Log.request_id != None, Log.request_id != '')",
107 107 foreign_keys="[Log.request_id]",
108 108 )
109 109
110 110 slow_calls = sa.orm.relationship(
111 111 "SlowCall",
112 112 backref="detail",
113 113 cascade="all, delete-orphan",
114 114 passive_deletes=True,
115 115 passive_updates=True,
116 116 order_by="SlowCall.timestamp",
117 117 )
118 118
119 119 def set_data(self, data, resource, protocol_version=None):
120 120 self.http_status = data["http_status"]
121 121 self.priority = data["priority"]
122 122 self.error = data["error"]
123 123 report_language = data.get("language", "").lower()
124 124 self.language = getattr(Language, report_language, Language.unknown)
125 125 # we need temp holder here to decide later
126 126 # if we want to to commit the tags if report is marked for creation
127 127 self.tags = {"server_name": data["server"], "view_name": data["view_name"]}
128 128 if data.get("tags"):
129 129 for tag_tuple in data["tags"]:
130 130 self.tags[tag_tuple[0]] = tag_tuple[1]
131 131 self.traceback = data["traceback"]
132 132 stripped_traceback = self.stripped_traceback()
133 133 tb_repr = repr(stripped_traceback).encode("utf8")
134 134 self.traceback_hash = hashlib.sha1(tb_repr).hexdigest()
135 135 url_info = urllib.parse.urlsplit(data.get("url", ""), allow_fragments=False)
136 136 self.url_domain = url_info.netloc[:128]
137 137 self.url_path = url_info.path[:2048]
138 138 self.occurences = data["occurences"]
139 139 if self.error:
140 140 self.report_type = ReportType.error
141 141 else:
142 142 self.report_type = ReportType.slow
143 143
144 144 # but if its status 404 its 404 type
145 145 if self.http_status in [404, "404"] or self.error == "404 Not Found":
146 146 self.report_type = ReportType.not_found
147 147 self.error = ""
148 148
149 149 self.generate_grouping_hash(
150 150 data.get("appenlight.group_string", data.get("group_string")),
151 151 resource.default_grouping,
152 152 protocol_version,
153 153 )
154 154
155 155 # details
156 156 if data["http_status"] in [404, "404"]:
157 157 data = {
158 158 "username": data["username"],
159 159 "ip": data["ip"],
160 160 "url": data["url"],
161 161 "user_agent": data["user_agent"],
162 162 }
163 163 if data.get("HTTP_REFERER") or data.get("http_referer"):
164 164 data["HTTP_REFERER"] = data.get("HTTP_REFERER", "") or data.get(
165 165 "http_referer", ""
166 166 )
167 167
168 168 self.resource_id = resource.resource_id
169 169 self.username = data["username"]
170 170 self.user_agent = data["user_agent"]
171 171 self.ip = data["ip"]
172 172 self.extra = {}
173 173 if data.get("extra"):
174 174 for extra_tuple in data["extra"]:
175 175 self.extra[extra_tuple[0]] = extra_tuple[1]
176 176
177 177 self.url = data["url"]
178 178 self.request_id = data.get("request_id", "").replace("-", "") or str(
179 179 uuid.uuid4()
180 180 )
181 181 request_data = data.get("request", {})
182 182
183 183 self.request = request_data
184 184 self.request_stats = data.get("request_stats", {})
185 185 traceback = data.get("traceback")
186 186 if not traceback:
187 187 traceback = data.get("frameinfo")
188 188 self.traceback = traceback
189 189 start_date = convert_date(data.get("start_time"))
190 190 if not self.start_time or self.start_time < start_date:
191 191 self.start_time = start_date
192 192
193 193 self.end_time = convert_date(data.get("end_time"), False)
194 194 self.duration = 0
195 195
196 196 if self.start_time and self.end_time:
197 197 d = self.end_time - self.start_time
198 198 self.duration = d.total_seconds()
199 199
200 200 # update tags with other vars
201 201 if self.username:
202 202 self.tags["user_name"] = self.username
203 203 self.tags["report_language"] = Language.key_from_value(self.language)
204 204
205 205 def add_slow_calls(self, data, report_group):
206 206 slow_calls = []
207 207 for call in data.get("slow_calls", []):
208 208 sc_inst = SlowCall()
209 209 sc_inst.set_data(
210 210 call, resource_id=self.resource_id, report_group=report_group
211 211 )
212 212 slow_calls.append(sc_inst)
213 213 self.slow_calls.extend(slow_calls)
214 214 return slow_calls
215 215
216 216 def get_dict(self, request, details=False, exclude_keys=None, include_keys=None):
217 217 from appenlight.models.services.report_group import ReportGroupService
218 218
219 219 instance_dict = super(Report, self).get_dict()
220 220 instance_dict["req_stats"] = self.req_stats()
221 221 instance_dict["group"] = {}
222 222 instance_dict["group"]["id"] = self.report_group.id
223 223 instance_dict["group"]["total_reports"] = self.report_group.total_reports
224 224 instance_dict["group"]["last_report"] = self.report_group.last_report
225 225 instance_dict["group"]["priority"] = self.report_group.priority
226 226 instance_dict["group"]["occurences"] = self.report_group.occurences
227 227 instance_dict["group"]["last_timestamp"] = self.report_group.last_timestamp
228 228 instance_dict["group"]["first_timestamp"] = self.report_group.first_timestamp
229 229 instance_dict["group"]["public"] = self.report_group.public
230 230 instance_dict["group"]["fixed"] = self.report_group.fixed
231 231 instance_dict["group"]["read"] = self.report_group.read
232 232 instance_dict["group"]["average_duration"] = self.report_group.average_duration
233 233
234 234 instance_dict["resource_name"] = self.report_group.application.resource_name
235 235 instance_dict["report_type"] = self.report_type
236 236
237 237 if instance_dict["http_status"] == 404 and not instance_dict["error"]:
238 238 instance_dict["error"] = "404 Not Found"
239 239
240 240 if details:
241 241 instance_dict[
242 242 "affected_users_count"
243 243 ] = ReportGroupService.affected_users_count(self.report_group)
244 244 instance_dict["top_affected_users"] = [
245 245 {"username": u.username, "count": u.count}
246 246 for u in ReportGroupService.top_affected_users(self.report_group)
247 247 ]
248 248 instance_dict["application"] = {"integrations": []}
249 249 for integration in self.report_group.application.integrations:
250 250 if integration.front_visible:
251 251 instance_dict["application"]["integrations"].append(
252 252 {
253 253 "name": integration.integration_name,
254 254 "action": integration.integration_action,
255 255 }
256 256 )
257 257 instance_dict["comments"] = [
258 258 c.get_dict() for c in self.report_group.comments
259 259 ]
260 260
261 261 instance_dict["group"]["next_report"] = None
262 262 instance_dict["group"]["previous_report"] = None
263 263 next_in_group = self.get_next_in_group(request)
264 264 previous_in_group = self.get_previous_in_group(request)
265 265 if next_in_group:
266 266 instance_dict["group"]["next_report"] = next_in_group
267 267 if previous_in_group:
268 268 instance_dict["group"]["previous_report"] = previous_in_group
269 269
270 270 # slow call ordering
271 271 def find_parent(row, data):
272 272 for r in reversed(data):
273 273 try:
274 274 if (
275 275 row["timestamp"] > r["timestamp"]
276 276 and row["end_time"] < r["end_time"]
277 277 ):
278 278 return r
279 279 except TypeError as e:
280 280 log.warning("reports_view.find_parent: %s" % e)
281 281 return None
282 282
283 283 new_calls = []
284 284 calls = [c.get_dict() for c in self.slow_calls]
285 285 while calls:
286 286 # start from end
287 287 for x in range(len(calls) - 1, -1, -1):
288 288 parent = find_parent(calls[x], calls)
289 289 if parent:
290 290 parent["children"].append(calls[x])
291 291 else:
292 292 # no parent at all? append to new calls anyways
293 293 new_calls.append(calls[x])
294 294 # print 'append', calls[x]
295 295 del calls[x]
296 296 break
297 297 instance_dict["slow_calls"] = new_calls
298 298
299 299 instance_dict["front_url"] = self.get_public_url(request)
300 300
301 301 exclude_keys_list = exclude_keys or []
302 302 include_keys_list = include_keys or []
303 303 for k in list(instance_dict.keys()):
304 304 if k == "group":
305 305 continue
306 306 if k in exclude_keys_list or (k not in include_keys_list and include_keys):
307 307 del instance_dict[k]
308 308 return instance_dict
309 309
310 310 def get_previous_in_group(self, request):
311 311 query = {
312 312 "size": 1,
313 313 "query": {
314 "filtered": {
314 "bool": {
315 315 "filter": {
316 316 "and": [
317 317 {"term": {"group_id": self.group_id}},
318 318 {"range": {"pg_id": {"lt": self.id}}},
319 319 ]
320 320 }
321 321 }
322 322 },
323 323 "sort": [{"_doc": {"order": "desc"}}],
324 324 }
325 325 result = request.es_conn.search(
326 326 body=query, index=self.partition_id, doc_type="report"
327 327 )
328 328 if result["hits"]["total"]:
329 329 return result["hits"]["hits"][0]["_source"]["pg_id"]
330 330
331 331 def get_next_in_group(self, request):
332 332 query = {
333 333 "size": 1,
334 334 "query": {
335 "filtered": {
335 "bool": {
336 336 "filter": {
337 337 "and": [
338 338 {"term": {"group_id": self.group_id}},
339 339 {"range": {"pg_id": {"gt": self.id}}},
340 340 ]
341 341 }
342 342 }
343 343 },
344 344 "sort": [{"_doc": {"order": "asc"}}],
345 345 }
346 346 result = request.es_conn.search(
347 347 body=query, index=self.partition_id, doc_type="report"
348 348 )
349 349 if result["hits"]["total"]:
350 350 return result["hits"]["hits"][0]["_source"]["pg_id"]
351 351
352 352 def get_public_url(self, request=None, report_group=None, _app_url=None):
353 353 """
354 354 Returns url that user can use to visit specific report
355 355 """
356 356 if not request:
357 357 request = get_current_request()
358 358 url = request.route_url("/", _app_url=_app_url)
359 359 if report_group:
360 360 return (url + "ui/report/%s/%s") % (report_group.id, self.id)
361 361 return (url + "ui/report/%s/%s") % (self.group_id, self.id)
362 362
363 363 def req_stats(self):
364 364 stats = self.request_stats.copy()
365 365 stats["percentages"] = {}
366 366 stats["percentages"]["main"] = 100.0
367 367 main = stats.get("main", 0.0)
368 368 if not main:
369 369 return None
370 370 for name, call_time in stats.items():
371 371 if "calls" not in name and "main" not in name and "percentages" not in name:
372 372 stats["main"] -= call_time
373 373 stats["percentages"][name] = math.floor((call_time / main * 100.0))
374 374 stats["percentages"]["main"] -= stats["percentages"][name]
375 375 if stats["percentages"]["main"] < 0.0:
376 376 stats["percentages"]["main"] = 0.0
377 377 stats["main"] = 0.0
378 378 return stats
379 379
380 380 def generate_grouping_hash(
381 381 self, hash_string=None, default_grouping=None, protocol_version=None
382 382 ):
383 383 """
384 384 Generates SHA1 hash that will be used to group reports together
385 385 """
386 386 if not hash_string:
387 387 location = self.tags.get("view_name") or self.url_path
388 388 server_name = self.tags.get("server_name") or ""
389 389 if default_grouping == "url_traceback":
390 390 hash_string = "%s_%s_%s" % (self.traceback_hash, location, self.error)
391 391 if self.language == Language.javascript:
392 392 hash_string = "%s_%s" % (self.traceback_hash, self.error)
393 393
394 394 elif default_grouping == "traceback_server":
395 395 hash_string = "%s_%s" % (self.traceback_hash, server_name)
396 396 if self.language == Language.javascript:
397 397 hash_string = "%s_%s" % (self.traceback_hash, server_name)
398 398 else:
399 399 hash_string = "%s_%s" % (self.error, location)
400 400 month = datetime.utcnow().date().replace(day=1)
401 401 hash_string = "{}_{}".format(month, hash_string)
402 402 binary_string = hash_string.encode("utf8")
403 403 self.grouping_hash = hashlib.sha1(binary_string).hexdigest()
404 404 return self.grouping_hash
405 405
406 406 def stripped_traceback(self):
407 407 """
408 408 Traceback without local vars
409 409 """
410 410 stripped_traceback = copy.deepcopy(self.traceback)
411 411
412 412 if isinstance(stripped_traceback, list):
413 413 for row in stripped_traceback:
414 414 row.pop("vars", None)
415 415 return stripped_traceback
416 416
417 417 def notify_channel(self, report_group):
418 418 """
419 419 Sends notification to websocket channel
420 420 """
421 421 settings = get_current_registry().settings
422 422 log.info("notify channelstream")
423 423 if self.report_type != ReportType.error:
424 424 return
425 425 payload = {
426 426 "type": "message",
427 427 "user": "__system__",
428 428 "channel": "app_%s" % self.resource_id,
429 429 "message": {
430 430 "topic": "front_dashboard.new_topic",
431 431 "report": {
432 432 "group": {
433 433 "priority": report_group.priority,
434 434 "first_timestamp": report_group.first_timestamp,
435 435 "last_timestamp": report_group.last_timestamp,
436 436 "average_duration": report_group.average_duration,
437 437 "occurences": report_group.occurences,
438 438 },
439 439 "report_id": self.id,
440 440 "group_id": self.group_id,
441 441 "resource_id": self.resource_id,
442 442 "http_status": self.http_status,
443 443 "url_domain": self.url_domain,
444 444 "url_path": self.url_path,
445 445 "error": self.error or "",
446 446 "server": self.tags.get("server_name"),
447 447 "view_name": self.tags.get("view_name"),
448 448 "front_url": self.get_public_url(),
449 449 },
450 450 },
451 451 }
452 452 channelstream_request(
453 453 settings["cometd.secret"],
454 454 "/message",
455 455 [payload],
456 456 servers=[settings["cometd_servers"]],
457 457 )
458 458
459 459 def es_doc(self):
460 460 tags = {}
461 461 tag_list = []
462 462 for name, value in self.tags.items():
463 463 name = name.replace(".", "_")
464 464 tag_list.append(name)
465 465 tags[name] = {
466 466 "values": convert_es_type(value),
467 467 "numeric_values": value
468 468 if (isinstance(value, (int, float)) and not isinstance(value, bool))
469 469 else None,
470 470 }
471 471
472 472 if "user_name" not in self.tags and self.username:
473 473 tags["user_name"] = {"value": [self.username], "numeric_value": None}
474 474 return {
475 475 "_id": str(self.id),
476 476 "pg_id": str(self.id),
477 477 "resource_id": self.resource_id,
478 478 "http_status": self.http_status or "",
479 479 "start_time": self.start_time,
480 480 "end_time": self.end_time,
481 481 "url_domain": self.url_domain if self.url_domain else "",
482 482 "url_path": self.url_path if self.url_path else "",
483 483 "duration": self.duration,
484 484 "error": self.error if self.error else "",
485 485 "report_type": self.report_type,
486 486 "request_id": self.request_id,
487 487 "ip": self.ip,
488 488 "group_id": str(self.group_id),
489 489 "_parent": str(self.group_id),
490 490 "tags": tags,
491 491 "tag_list": tag_list,
492 492 }
493 493
494 494 @property
495 495 def partition_id(self):
496 496 return "rcae_r_%s" % self.report_group_time.strftime("%Y_%m")
497 497
498 498 def partition_range(self):
499 499 start_date = self.report_group_time.date().replace(day=1)
500 500 end_date = start_date + timedelta(days=40)
501 501 end_date = end_date.replace(day=1)
502 502 return start_date, end_date
503 503
504 504
505 505 def after_insert(mapper, connection, target):
506 506 if not hasattr(target, "_skip_ft_index"):
507 507 data = target.es_doc()
508 508 data.pop("_id", None)
509 509 Datastores.es.index(
510 510 target.partition_id, "report", data, parent=target.group_id, id=target.id
511 511 )
512 512
513 513
514 514 def after_update(mapper, connection, target):
515 515 if not hasattr(target, "_skip_ft_index"):
516 516 data = target.es_doc()
517 517 data.pop("_id", None)
518 518 Datastores.es.index(
519 519 target.partition_id, "report", data, parent=target.group_id, id=target.id
520 520 )
521 521
522 522
523 523 def after_delete(mapper, connection, target):
524 524 if not hasattr(target, "_skip_ft_index"):
525 525 query = {"query": {"term": {"pg_id": target.id}}}
526 526 Datastores.es.transport.perform_request(
527 527 "DELETE", "/{}/{}/_query".format(target.partition_id, "report"), body=query
528 528 )
529 529
530 530
531 531 sa.event.listen(Report, "after_insert", after_insert)
532 532 sa.event.listen(Report, "after_update", after_update)
533 533 sa.event.listen(Report, "after_delete", after_delete)
@@ -1,222 +1,222 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors
4 4 #
5 5 # Licensed under the Apache License, Version 2.0 (the "License");
6 6 # you may not use this file except in compliance with the License.
7 7 # You may obtain a copy of the License at
8 8 #
9 9 # http://www.apache.org/licenses/LICENSE-2.0
10 10 #
11 11 # Unless required by applicable law or agreed to in writing, software
12 12 # distributed under the License is distributed on an "AS IS" BASIS,
13 13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 14 # See the License for the specific language governing permissions and
15 15 # limitations under the License.
16 16
17 17 import paginate
18 18 import logging
19 19 import sqlalchemy as sa
20 20
21 21 from appenlight.models.log import Log
22 22 from appenlight.models import get_db_session, Datastores
23 23 from appenlight.models.services.base import BaseService
24 24 from appenlight.lib.utils import es_index_name_limiter
25 25
26 26 log = logging.getLogger(__name__)
27 27
28 28
29 29 class LogService(BaseService):
30 30 @classmethod
31 31 def get_logs(cls, resource_ids=None, filter_settings=None, db_session=None):
32 32 # ensure we always have id's passed
33 33 if not resource_ids:
34 34 # raise Exception('No App ID passed')
35 35 return []
36 36 db_session = get_db_session(db_session)
37 37 q = db_session.query(Log)
38 38 q = q.filter(Log.resource_id.in_(resource_ids))
39 39 if filter_settings.get("start_date"):
40 40 q = q.filter(Log.timestamp >= filter_settings.get("start_date"))
41 41 if filter_settings.get("end_date"):
42 42 q = q.filter(Log.timestamp <= filter_settings.get("end_date"))
43 43 if filter_settings.get("log_level"):
44 44 q = q.filter(Log.log_level == filter_settings.get("log_level").upper())
45 45 if filter_settings.get("request_id"):
46 46 request_id = filter_settings.get("request_id", "")
47 47 q = q.filter(Log.request_id == request_id.replace("-", ""))
48 48 if filter_settings.get("namespace"):
49 49 q = q.filter(Log.namespace == filter_settings.get("namespace"))
50 50 q = q.order_by(sa.desc(Log.timestamp))
51 51 return q
52 52
53 53 @classmethod
54 54 def es_query_builder(cls, app_ids, filter_settings):
55 55 if not filter_settings:
56 56 filter_settings = {}
57 57
58 58 query = {
59 59 "query": {
60 "filtered": {
60 "bool": {
61 61 "filter": {"and": [{"terms": {"resource_id": list(app_ids)}}]}
62 62 }
63 63 }
64 64 }
65 65
66 66 start_date = filter_settings.get("start_date")
67 67 end_date = filter_settings.get("end_date")
68 filter_part = query["query"]["filtered"]["filter"]["and"]
68 filter_part = query["query"]["bool"]["filter"]["and"]
69 69
70 70 for tag in filter_settings.get("tags", []):
71 71 tag_values = [v.lower() for v in tag["value"]]
72 72 key = "tags.%s.values" % tag["name"].replace(".", "_")
73 73 filter_part.append({"terms": {key: tag_values}})
74 74
75 75 date_range = {"range": {"timestamp": {}}}
76 76 if start_date:
77 77 date_range["range"]["timestamp"]["gte"] = start_date
78 78 if end_date:
79 79 date_range["range"]["timestamp"]["lte"] = end_date
80 80 if start_date or end_date:
81 81 filter_part.append(date_range)
82 82
83 83 levels = filter_settings.get("level")
84 84 if levels:
85 85 filter_part.append({"terms": {"log_level": levels}})
86 86 namespaces = filter_settings.get("namespace")
87 87 if namespaces:
88 88 filter_part.append({"terms": {"namespace": namespaces}})
89 89
90 90 request_ids = filter_settings.get("request_id")
91 91 if request_ids:
92 92 filter_part.append({"terms": {"request_id": request_ids}})
93 93
94 94 messages = filter_settings.get("message")
95 95 if messages:
96 query["query"]["filtered"]["query"] = {
96 query["query"]["bool"]["must"] = {
97 97 "match": {"message": {"query": " ".join(messages), "operator": "and"}}
98 98 }
99 99 return query
100 100
101 101 @classmethod
102 102 def get_time_series_aggregate(cls, app_ids=None, filter_settings=None):
103 103 if not app_ids:
104 104 return {}
105 105 es_query = cls.es_query_builder(app_ids, filter_settings)
106 106 es_query["aggs"] = {
107 107 "events_over_time": {
108 108 "date_histogram": {
109 109 "field": "timestamp",
110 110 "interval": "1h",
111 111 "min_doc_count": 0,
112 112 "extended_bounds": {
113 113 "max": filter_settings.get("end_date"),
114 114 "min": filter_settings.get("start_date"),
115 115 },
116 116 }
117 117 }
118 118 }
119 119 log.debug(es_query)
120 120 index_names = es_index_name_limiter(
121 121 filter_settings.get("start_date"),
122 122 filter_settings.get("end_date"),
123 123 ixtypes=["logs"],
124 124 )
125 125 if index_names:
126 126 results = Datastores.es.search(
127 127 body=es_query, index=index_names, doc_type="log", size=0
128 128 )
129 129 else:
130 130 results = []
131 131 return results
132 132
133 133 @classmethod
134 134 def get_search_iterator(
135 cls,
136 app_ids=None,
137 page=1,
138 items_per_page=50,
139 order_by=None,
140 filter_settings=None,
141 limit=None,
135 cls,
136 app_ids=None,
137 page=1,
138 items_per_page=50,
139 order_by=None,
140 filter_settings=None,
141 limit=None,
142 142 ):
143 143 if not app_ids:
144 144 return {}, 0
145 145
146 146 es_query = cls.es_query_builder(app_ids, filter_settings)
147 147 sort_query = {"sort": [{"timestamp": {"order": "desc"}}]}
148 148 es_query.update(sort_query)
149 149 log.debug(es_query)
150 150 es_from = (page - 1) * items_per_page
151 151 index_names = es_index_name_limiter(
152 152 filter_settings.get("start_date"),
153 153 filter_settings.get("end_date"),
154 154 ixtypes=["logs"],
155 155 )
156 156 if not index_names:
157 157 return {}, 0
158 158
159 159 results = Datastores.es.search(
160 160 body=es_query,
161 161 index=index_names,
162 162 doc_type="log",
163 163 size=items_per_page,
164 164 from_=es_from,
165 165 )
166 166 if results["hits"]["total"] > 5000:
167 167 count = 5000
168 168 else:
169 169 count = results["hits"]["total"]
170 170 return results["hits"], count
171 171
172 172 @classmethod
173 173 def get_paginator_by_app_ids(
174 cls,
175 app_ids=None,
176 page=1,
177 item_count=None,
178 items_per_page=50,
179 order_by=None,
180 filter_settings=None,
181 exclude_columns=None,
182 db_session=None,
174 cls,
175 app_ids=None,
176 page=1,
177 item_count=None,
178 items_per_page=50,
179 order_by=None,
180 filter_settings=None,
181 exclude_columns=None,
182 db_session=None,
183 183 ):
184 184 if not filter_settings:
185 185 filter_settings = {}
186 186 results, item_count = cls.get_search_iterator(
187 187 app_ids, page, items_per_page, order_by, filter_settings
188 188 )
189 189 paginator = paginate.Page(
190 190 [], item_count=item_count, items_per_page=items_per_page, **filter_settings
191 191 )
192 192 ordered_ids = tuple(
193 193 item["_source"]["pg_id"] for item in results.get("hits", [])
194 194 )
195 195
196 196 sorted_instance_list = []
197 197 if ordered_ids:
198 198 db_session = get_db_session(db_session)
199 199 query = db_session.query(Log)
200 200 query = query.filter(Log.log_id.in_(ordered_ids))
201 201 query = query.order_by(sa.desc("timestamp"))
202 202 sa_items = query.all()
203 203 # resort by score
204 204 for i_id in ordered_ids:
205 205 for item in sa_items:
206 206 if str(item.log_id) == str(i_id):
207 207 sorted_instance_list.append(item)
208 208 paginator.sa_items = sorted_instance_list
209 209 return paginator
210 210
211 211 @classmethod
212 212 def query_by_primary_key_and_namespace(cls, list_of_pairs, db_session=None):
213 213 db_session = get_db_session(db_session)
214 214 list_of_conditions = []
215 215 query = db_session.query(Log)
216 216 for pair in list_of_pairs:
217 217 list_of_conditions.append(
218 218 sa.and_(Log.primary_key == pair["pk"], Log.namespace == pair["ns"])
219 219 )
220 220 query = query.filter(sa.or_(*list_of_conditions))
221 221 query = query.order_by(sa.asc(Log.timestamp), sa.asc(Log.log_id))
222 222 return query
@@ -1,521 +1,523 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors
4 4 #
5 5 # Licensed under the Apache License, Version 2.0 (the "License");
6 6 # you may not use this file except in compliance with the License.
7 7 # You may obtain a copy of the License at
8 8 #
9 9 # http://www.apache.org/licenses/LICENSE-2.0
10 10 #
11 11 # Unless required by applicable law or agreed to in writing, software
12 12 # distributed under the License is distributed on an "AS IS" BASIS,
13 13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 14 # See the License for the specific language governing permissions and
15 15 # limitations under the License.
16 16
17 17 import logging
18 18 import paginate
19 19 import sqlalchemy as sa
20 20 import appenlight.lib.helpers as h
21 21
22 22 from datetime import datetime
23 23
24 24 from appenlight.models import get_db_session, Datastores
25 25 from appenlight.models.report import Report
26 26 from appenlight.models.report_group import ReportGroup
27 27 from appenlight.models.report_comment import ReportComment
28 28 from appenlight.models.user import User
29 29 from appenlight.models.services.base import BaseService
30 30 from appenlight.lib.enums import ReportType
31 31 from appenlight.lib.utils import es_index_name_limiter
32 32
33 33 log = logging.getLogger(__name__)
34 34
35 35
36 36 class ReportGroupService(BaseService):
37 37 @classmethod
38 38 def get_trending(cls, request, filter_settings, limit=15, db_session=None):
39 39 """
40 40 Returns report groups trending for specific time interval
41 41 """
42 42 db_session = get_db_session(db_session)
43 43
44 44 tags = []
45 45 if filter_settings.get("tags"):
46 46 for tag in filter_settings["tags"]:
47 47 tags.append(
48 48 {"terms": {"tags.{}.values".format(tag["name"]): tag["value"]}}
49 49 )
50 50
51 51 index_names = es_index_name_limiter(
52 52 start_date=filter_settings["start_date"],
53 53 end_date=filter_settings["end_date"],
54 54 ixtypes=["reports"],
55 55 )
56 56
57 57 if not index_names or not filter_settings["resource"]:
58 58 return []
59 59
60 60 es_query = {
61 61 "aggs": {
62 62 "parent_agg": {
63 63 "aggs": {
64 64 "groups": {
65 65 "aggs": {
66 66 "sub_agg": {
67 67 "value_count": {"field": "tags.group_id.values"}
68 68 }
69 69 },
70 70 "filter": {"exists": {"field": "tags.group_id.values"}},
71 71 }
72 72 },
73 73 "terms": {"field": "tags.group_id.values", "size": limit},
74 74 }
75 75 },
76 76 "query": {
77 "filtered": {
77 "bool": {
78 78 "filter": {
79 79 "and": [
80 80 {
81 81 "terms": {
82 82 "resource_id": [filter_settings["resource"][0]]
83 83 }
84 84 },
85 85 {
86 86 "range": {
87 87 "timestamp": {
88 88 "gte": filter_settings["start_date"],
89 89 "lte": filter_settings["end_date"],
90 90 }
91 91 }
92 92 },
93 93 ]
94 94 }
95 95 }
96 96 },
97 97 }
98 98 if tags:
99 es_query["query"]["filtered"]["filter"]["and"].extend(tags)
99 es_query["query"]["bool"]["filter"]["and"].extend(tags)
100 100
101 101 result = Datastores.es.search(
102 102 body=es_query, index=index_names, doc_type="log", size=0
103 103 )
104 104 series = []
105 105 for bucket in result["aggregations"]["parent_agg"]["buckets"]:
106 106 series.append(
107 107 {"key": bucket["key"], "groups": bucket["groups"]["sub_agg"]["value"]}
108 108 )
109 109
110 110 report_groups_d = {}
111 111 for g in series:
112 112 report_groups_d[int(g["key"])] = g["groups"] or 0
113 113
114 114 query = db_session.query(ReportGroup)
115 115 query = query.filter(ReportGroup.id.in_(list(report_groups_d.keys())))
116 116 query = query.options(sa.orm.joinedload(ReportGroup.last_report_ref))
117 117 results = [(report_groups_d[group.id], group) for group in query]
118 118 return sorted(results, reverse=True, key=lambda x: x[0])
119 119
120 120 @classmethod
121 121 def get_search_iterator(
122 122 cls,
123 123 app_ids=None,
124 124 page=1,
125 125 items_per_page=50,
126 126 order_by=None,
127 127 filter_settings=None,
128 128 limit=None,
129 129 ):
130 130 if not app_ids:
131 131 return {}
132 132 if not filter_settings:
133 133 filter_settings = {}
134 134
135 135 query = {
136 136 "size": 0,
137 137 "query": {
138 "filtered": {
138 "bool": {
139 "must": [],
140 "should": [],
139 141 "filter": {"and": [{"terms": {"resource_id": list(app_ids)}}]}
140 142 }
141 143 },
142 144 "aggs": {
143 145 "top_groups": {
144 146 "terms": {
145 147 "size": 5000,
146 148 "field": "_parent",
147 149 "order": {"newest": "desc"},
148 150 },
149 151 "aggs": {
150 152 "top_reports_hits": {
151 153 "top_hits": {"size": 1, "sort": {"start_time": "desc"}}
152 154 },
153 155 "newest": {"max": {"field": "start_time"}},
154 156 },
155 157 }
156 158 },
157 159 }
158 160
159 161 start_date = filter_settings.get("start_date")
160 162 end_date = filter_settings.get("end_date")
161 filter_part = query["query"]["filtered"]["filter"]["and"]
163 filter_part = query["query"]["bool"]["filter"]["and"]
162 164 date_range = {"range": {"start_time": {}}}
163 165 if start_date:
164 166 date_range["range"]["start_time"]["gte"] = start_date
165 167 if end_date:
166 168 date_range["range"]["start_time"]["lte"] = end_date
167 169 if start_date or end_date:
168 170 filter_part.append(date_range)
169 171
170 172 priorities = filter_settings.get("priority")
171 173
172 174 for tag in filter_settings.get("tags", []):
173 175 tag_values = [v.lower() for v in tag["value"]]
174 176 key = "tags.%s.values" % tag["name"].replace(".", "_")
175 177 filter_part.append({"terms": {key: tag_values}})
176 178
177 179 if priorities:
178 180 filter_part.append(
179 181 {
180 182 "has_parent": {
181 183 "parent_type": "report_group",
182 184 "query": {"terms": {"priority": priorities}},
183 185 }
184 186 }
185 187 )
186 188
187 189 min_occurences = filter_settings.get("min_occurences")
188 190 if min_occurences:
189 191 filter_part.append(
190 192 {
191 193 "has_parent": {
192 194 "parent_type": "report_group",
193 195 "query": {"range": {"occurences": {"gte": min_occurences[0]}}},
194 196 }
195 197 }
196 198 )
197 199
198 200 min_duration = filter_settings.get("min_duration")
199 201 max_duration = filter_settings.get("max_duration")
200 202
201 203 request_ids = filter_settings.get("request_id")
202 204 if request_ids:
203 205 filter_part.append({"terms": {"request_id": request_ids}})
204 206
205 207 duration_range = {"range": {"average_duration": {}}}
206 208 if min_duration:
207 209 duration_range["range"]["average_duration"]["gte"] = min_duration[0]
208 210 if max_duration:
209 211 duration_range["range"]["average_duration"]["lte"] = max_duration[0]
210 212 if min_duration or max_duration:
211 213 filter_part.append(
212 214 {"has_parent": {"parent_type": "report_group", "query": duration_range}}
213 215 )
214 216
215 217 http_status = filter_settings.get("http_status")
216 218 report_type = filter_settings.get("report_type", [ReportType.error])
217 219 # set error report type if http status is not found
218 220 # and we are dealing with slow reports
219 221 if not http_status or ReportType.slow in report_type:
220 222 filter_part.append({"terms": {"report_type": report_type}})
221 223 if http_status:
222 224 filter_part.append({"terms": {"http_status": http_status}})
223 225
224 226 messages = filter_settings.get("message")
225 227 if messages:
226 228 condition = {"match": {"message": " ".join(messages)}}
227 query["query"]["filtered"]["query"] = condition
229 query["query"]["bool"]["must"].append(condition)
228 230 errors = filter_settings.get("error")
229 231 if errors:
230 232 condition = {"match": {"error": " ".join(errors)}}
231 query["query"]["filtered"]["query"] = condition
233 query["query"]["bool"]["must"].append(condition)
232 234 url_domains = filter_settings.get("url_domain")
233 235 if url_domains:
234 236 condition = {"terms": {"url_domain": url_domains}}
235 query["query"]["filtered"]["query"] = condition
237 query["query"]["bool"]["must"].append(condition)
236 238 url_paths = filter_settings.get("url_path")
237 239 if url_paths:
238 240 condition = {"terms": {"url_path": url_paths}}
239 query["query"]["filtered"]["query"] = condition
241 query["query"]["bool"]["must"].append(condition)
240 242
241 243 if filter_settings.get("report_status"):
242 244 for status in filter_settings.get("report_status"):
243 245 if status == "never_reviewed":
244 246 filter_part.append(
245 247 {
246 248 "has_parent": {
247 249 "parent_type": "report_group",
248 250 "query": {"term": {"read": False}},
249 251 }
250 252 }
251 253 )
252 254 elif status == "reviewed":
253 255 filter_part.append(
254 256 {
255 257 "has_parent": {
256 258 "parent_type": "report_group",
257 259 "query": {"term": {"read": True}},
258 260 }
259 261 }
260 262 )
261 263 elif status == "public":
262 264 filter_part.append(
263 265 {
264 266 "has_parent": {
265 267 "parent_type": "report_group",
266 268 "query": {"term": {"public": True}},
267 269 }
268 270 }
269 271 )
270 272 elif status == "fixed":
271 273 filter_part.append(
272 274 {
273 275 "has_parent": {
274 276 "parent_type": "report_group",
275 277 "query": {"term": {"fixed": True}},
276 278 }
277 279 }
278 280 )
279 281
280 282 # logging.getLogger('pyelasticsearch').setLevel(logging.DEBUG)
281 283 index_names = es_index_name_limiter(
282 284 filter_settings.get("start_date"),
283 285 filter_settings.get("end_date"),
284 286 ixtypes=["reports"],
285 287 )
286 288 if index_names:
287 289 results = Datastores.es.search(
288 290 body=query,
289 291 index=index_names,
290 292 doc_type=["report", "report_group"],
291 293 size=0,
292 294 )
293 295 else:
294 296 return []
295 297 return results["aggregations"]
296 298
297 299 @classmethod
298 300 def get_paginator_by_app_ids(
299 301 cls,
300 302 app_ids=None,
301 303 page=1,
302 304 item_count=None,
303 305 items_per_page=50,
304 306 order_by=None,
305 307 filter_settings=None,
306 308 exclude_columns=None,
307 309 db_session=None,
308 310 ):
309 311 if not filter_settings:
310 312 filter_settings = {}
311 313 results = cls.get_search_iterator(
312 314 app_ids, page, items_per_page, order_by, filter_settings
313 315 )
314 316
315 317 ordered_ids = []
316 318 if results:
317 319 for item in results["top_groups"]["buckets"]:
318 320 pg_id = item["top_reports_hits"]["hits"]["hits"][0]["_source"]["pg_id"]
319 321 ordered_ids.append(pg_id)
320 322 log.info(filter_settings)
321 323 paginator = paginate.Page(
322 324 ordered_ids, items_per_page=items_per_page, **filter_settings
323 325 )
324 326 sa_items = ()
325 327 if paginator.items:
326 328 db_session = get_db_session(db_session)
327 329 # latest report detail
328 330 query = db_session.query(Report)
329 331 query = query.options(sa.orm.joinedload(Report.report_group))
330 332 query = query.filter(Report.id.in_(paginator.items))
331 333 if filter_settings.get("order_col"):
332 334 order_col = filter_settings.get("order_col")
333 335 if filter_settings.get("order_dir") == "dsc":
334 336 sort_on = "desc"
335 337 else:
336 338 sort_on = "asc"
337 339 if order_col == "when":
338 340 order_col = "last_timestamp"
339 341 query = query.order_by(
340 342 getattr(sa, sort_on)(getattr(ReportGroup, order_col))
341 343 )
342 344 sa_items = query.all()
343 345 sorted_instance_list = []
344 346 for i_id in ordered_ids:
345 347 for report in sa_items:
346 348 if str(report.id) == i_id and report not in sorted_instance_list:
347 349 sorted_instance_list.append(report)
348 350 paginator.sa_items = sorted_instance_list
349 351 return paginator
350 352
351 353 @classmethod
352 354 def by_app_ids(cls, app_ids=None, order_by=True, db_session=None):
353 355 db_session = get_db_session(db_session)
354 356 q = db_session.query(ReportGroup)
355 357 if app_ids:
356 358 q = q.filter(ReportGroup.resource_id.in_(app_ids))
357 359 if order_by:
358 360 q = q.order_by(sa.desc(ReportGroup.id))
359 361 return q
360 362
361 363 @classmethod
362 364 def by_id(cls, group_id, app_ids=None, db_session=None):
363 365 db_session = get_db_session(db_session)
364 366 q = db_session.query(ReportGroup).filter(ReportGroup.id == int(group_id))
365 367 if app_ids:
366 368 q = q.filter(ReportGroup.resource_id.in_(app_ids))
367 369 return q.first()
368 370
369 371 @classmethod
370 372 def by_ids(cls, group_ids=None, db_session=None):
371 373 db_session = get_db_session(db_session)
372 374 query = db_session.query(ReportGroup)
373 375 query = query.filter(ReportGroup.id.in_(group_ids))
374 376 return query
375 377
376 378 @classmethod
377 379 def by_hash_and_resource(
378 380 cls, resource_id, grouping_hash, since_when=None, db_session=None
379 381 ):
380 382 db_session = get_db_session(db_session)
381 383 q = db_session.query(ReportGroup)
382 384 q = q.filter(ReportGroup.resource_id == resource_id)
383 385 q = q.filter(ReportGroup.grouping_hash == grouping_hash)
384 386 q = q.filter(ReportGroup.fixed == False)
385 387 if since_when:
386 388 q = q.filter(ReportGroup.first_timestamp >= since_when)
387 389 return q.first()
388 390
389 391 @classmethod
390 392 def users_commenting(cls, report_group, exclude_user_id=None, db_session=None):
391 393 db_session = get_db_session(None, report_group)
392 394 query = db_session.query(User).distinct()
393 395 query = query.filter(User.id == ReportComment.owner_id)
394 396 query = query.filter(ReportComment.group_id == report_group.id)
395 397 if exclude_user_id:
396 398 query = query.filter(ReportComment.owner_id != exclude_user_id)
397 399 return query
398 400
399 401 @classmethod
400 402 def affected_users_count(cls, report_group, db_session=None):
401 403 db_session = get_db_session(db_session)
402 404 query = db_session.query(sa.func.count(Report.username))
403 405 query = query.filter(Report.group_id == report_group.id)
404 406 query = query.filter(Report.username != "")
405 407 query = query.filter(Report.username != None)
406 408 query = query.group_by(Report.username)
407 409 return query.count()
408 410
409 411 @classmethod
410 412 def top_affected_users(cls, report_group, db_session=None):
411 413 db_session = get_db_session(db_session)
412 414 count_label = sa.func.count(Report.username).label("count")
413 415 query = db_session.query(Report.username, count_label)
414 416 query = query.filter(Report.group_id == report_group.id)
415 417 query = query.filter(Report.username != None)
416 418 query = query.filter(Report.username != "")
417 419 query = query.group_by(Report.username)
418 420 query = query.order_by(sa.desc(count_label))
419 421 query = query.limit(50)
420 422 return query
421 423
422 424 @classmethod
423 425 def get_report_stats(cls, request, filter_settings):
424 426 """
425 427 Gets report dashboard graphs
426 428 Returns information for BAR charts with occurences/interval information
427 429 detailed means version that returns time intervals - non detailed
428 430 returns total sum
429 431 """
430 432 delta = filter_settings["end_date"] - filter_settings["start_date"]
431 433 if delta < h.time_deltas.get("12h")["delta"]:
432 434 interval = "1m"
433 435 elif delta <= h.time_deltas.get("3d")["delta"]:
434 436 interval = "5m"
435 437 elif delta >= h.time_deltas.get("2w")["delta"]:
436 438 interval = "24h"
437 439 else:
438 440 interval = "1h"
439 441
440 442 group_id = filter_settings.get("group_id")
441 443
442 444 es_query = {
443 445 "aggs": {
444 446 "parent_agg": {
445 447 "aggs": {
446 448 "types": {
447 449 "aggs": {
448 450 "sub_agg": {"terms": {"field": "tags.type.values"}}
449 451 },
450 452 "filter": {
451 453 "and": [{"exists": {"field": "tags.type.values"}}]
452 454 },
453 455 }
454 456 },
455 457 "date_histogram": {
456 458 "extended_bounds": {
457 459 "max": filter_settings["end_date"],
458 460 "min": filter_settings["start_date"],
459 461 },
460 462 "field": "timestamp",
461 463 "interval": interval,
462 464 "min_doc_count": 0,
463 465 },
464 466 }
465 467 },
466 468 "query": {
467 "filtered": {
469 "bool": {
468 470 "filter": {
469 471 "and": [
470 472 {
471 473 "terms": {
472 474 "resource_id": [filter_settings["resource"][0]]
473 475 }
474 476 },
475 477 {
476 478 "range": {
477 479 "timestamp": {
478 480 "gte": filter_settings["start_date"],
479 481 "lte": filter_settings["end_date"],
480 482 }
481 483 }
482 484 },
483 485 ]
484 486 }
485 487 }
486 488 },
487 489 }
488 490 if group_id:
489 491 parent_agg = es_query["aggs"]["parent_agg"]
490 492 filters = parent_agg["aggs"]["types"]["filter"]["and"]
491 493 filters.append({"terms": {"tags.group_id.values": [group_id]}})
492 494
493 495 index_names = es_index_name_limiter(
494 496 start_date=filter_settings["start_date"],
495 497 end_date=filter_settings["end_date"],
496 498 ixtypes=["reports"],
497 499 )
498 500
499 501 if not index_names:
500 502 return []
501 503
502 504 result = Datastores.es.search(
503 505 body=es_query, index=index_names, doc_type="log", size=0
504 506 )
505 507 series = []
506 508 for bucket in result["aggregations"]["parent_agg"]["buckets"]:
507 509 point = {
508 510 "x": datetime.utcfromtimestamp(int(bucket["key"]) / 1000),
509 511 "report": 0,
510 512 "not_found": 0,
511 513 "slow_report": 0,
512 514 }
513 515 for subbucket in bucket["types"]["sub_agg"]["buckets"]:
514 516 if subbucket["key"] == "slow":
515 517 point["slow_report"] = subbucket["doc_count"]
516 518 elif subbucket["key"] == "error":
517 519 point["report"] = subbucket["doc_count"]
518 520 elif subbucket["key"] == "not_found":
519 521 point["not_found"] = subbucket["doc_count"]
520 522 series.append(point)
521 523 return series
@@ -1,63 +1,63 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors
4 4 #
5 5 # Licensed under the Apache License, Version 2.0 (the "License");
6 6 # you may not use this file except in compliance with the License.
7 7 # You may obtain a copy of the License at
8 8 #
9 9 # http://www.apache.org/licenses/LICENSE-2.0
10 10 #
11 11 # Unless required by applicable law or agreed to in writing, software
12 12 # distributed under the License is distributed on an "AS IS" BASIS,
13 13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 14 # See the License for the specific language governing permissions and
15 15 # limitations under the License.
16 16
17 17 from appenlight.models import Datastores
18 18 from appenlight.models.services.base import BaseService
19 19 from appenlight.lib.enums import ReportType
20 20 from appenlight.lib.utils import es_index_name_limiter
21 21
22 22
23 23 class ReportStatService(BaseService):
24 24 @classmethod
25 25 def count_by_type(cls, report_type, resource_id, since_when):
26 26 report_type = ReportType.key_from_value(report_type)
27 27
28 28 index_names = es_index_name_limiter(start_date=since_when, ixtypes=["reports"])
29 29
30 30 es_query = {
31 31 "aggs": {
32 32 "reports": {
33 33 "aggs": {
34 34 "sub_agg": {"value_count": {"field": "tags.group_id.values"}}
35 35 },
36 36 "filter": {
37 37 "and": [
38 38 {"terms": {"resource_id": [resource_id]}},
39 39 {"exists": {"field": "tags.group_id.values"}},
40 40 ]
41 41 },
42 42 }
43 43 },
44 44 "query": {
45 "filtered": {
45 "bool": {
46 46 "filter": {
47 47 "and": [
48 48 {"terms": {"resource_id": [resource_id]}},
49 49 {"terms": {"tags.type.values": [report_type]}},
50 50 {"range": {"timestamp": {"gte": since_when}}},
51 51 ]
52 52 }
53 53 }
54 54 },
55 55 }
56 56
57 57 if index_names:
58 58 result = Datastores.es.search(
59 59 body=es_query, index=index_names, doc_type="log", size=0
60 60 )
61 61 return result["aggregations"]["reports"]["sub_agg"]["value"]
62 62 else:
63 63 return 0
@@ -1,617 +1,617 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors
4 4 #
5 5 # Licensed under the Apache License, Version 2.0 (the "License");
6 6 # you may not use this file except in compliance with the License.
7 7 # You may obtain a copy of the License at
8 8 #
9 9 # http://www.apache.org/licenses/LICENSE-2.0
10 10 #
11 11 # Unless required by applicable law or agreed to in writing, software
12 12 # distributed under the License is distributed on an "AS IS" BASIS,
13 13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 14 # See the License for the specific language governing permissions and
15 15 # limitations under the License.
16 16
17 17 from datetime import datetime
18 18
19 19 import appenlight.lib.helpers as h
20 20 from appenlight.models import get_db_session, Datastores
21 21 from appenlight.models.services.base import BaseService
22 22 from appenlight.lib.enums import ReportType
23 23 from appenlight.lib.utils import es_index_name_limiter
24 24
25 25 try:
26 26 from ae_uptime_ce.models.services.uptime_metric import UptimeMetricService
27 27 except ImportError:
28 28 UptimeMetricService = None
29 29
30 30
31 31 def check_key(key, stats, uptime, total_seconds):
32 32 if key not in stats:
33 33 stats[key] = {
34 34 "name": key,
35 35 "requests": 0,
36 36 "errors": 0,
37 37 "tolerated_requests": 0,
38 38 "frustrating_requests": 0,
39 39 "satisfying_requests": 0,
40 40 "total_minutes": total_seconds / 60.0,
41 41 "uptime": uptime,
42 42 "apdex": 0,
43 43 "rpm": 0,
44 44 "response_time": 0,
45 45 "avg_response_time": 0,
46 46 }
47 47
48 48
49 49 class RequestMetricService(BaseService):
50 50 @classmethod
51 51 def get_metrics_stats(cls, request, filter_settings, db_session=None):
52 52 delta = filter_settings["end_date"] - filter_settings["start_date"]
53 53 if delta < h.time_deltas.get("12h")["delta"]:
54 54 interval = "1m"
55 55 elif delta <= h.time_deltas.get("3d")["delta"]:
56 56 interval = "5m"
57 57 elif delta >= h.time_deltas.get("2w")["delta"]:
58 58 interval = "24h"
59 59 else:
60 60 interval = "1h"
61 61
62 62 filter_settings["namespace"] = ["appenlight.request_metric"]
63 63
64 64 es_query = {
65 65 "aggs": {
66 66 "parent_agg": {
67 67 "aggs": {
68 68 "custom": {
69 69 "aggs": {
70 70 "sub_agg": {
71 71 "sum": {"field": "tags.custom.numeric_values"}
72 72 }
73 73 },
74 74 "filter": {
75 75 "exists": {"field": "tags.custom.numeric_values"}
76 76 },
77 77 },
78 78 "main": {
79 79 "aggs": {
80 80 "sub_agg": {
81 81 "sum": {"field": "tags.main.numeric_values"}
82 82 }
83 83 },
84 84 "filter": {"exists": {"field": "tags.main.numeric_values"}},
85 85 },
86 86 "nosql": {
87 87 "aggs": {
88 88 "sub_agg": {
89 89 "sum": {"field": "tags.nosql.numeric_values"}
90 90 }
91 91 },
92 92 "filter": {
93 93 "exists": {"field": "tags.nosql.numeric_values"}
94 94 },
95 95 },
96 96 "remote": {
97 97 "aggs": {
98 98 "sub_agg": {
99 99 "sum": {"field": "tags.remote.numeric_values"}
100 100 }
101 101 },
102 102 "filter": {
103 103 "exists": {"field": "tags.remote.numeric_values"}
104 104 },
105 105 },
106 106 "requests": {
107 107 "aggs": {
108 108 "sub_agg": {
109 109 "sum": {"field": "tags.requests.numeric_values"}
110 110 }
111 111 },
112 112 "filter": {
113 113 "exists": {"field": "tags.requests.numeric_values"}
114 114 },
115 115 },
116 116 "sql": {
117 117 "aggs": {
118 118 "sub_agg": {"sum": {"field": "tags.sql.numeric_values"}}
119 119 },
120 120 "filter": {"exists": {"field": "tags.sql.numeric_values"}},
121 121 },
122 122 "tmpl": {
123 123 "aggs": {
124 124 "sub_agg": {
125 125 "sum": {"field": "tags.tmpl.numeric_values"}
126 126 }
127 127 },
128 128 "filter": {"exists": {"field": "tags.tmpl.numeric_values"}},
129 129 },
130 130 },
131 131 "date_histogram": {
132 132 "extended_bounds": {
133 133 "max": filter_settings["end_date"],
134 134 "min": filter_settings["start_date"],
135 135 },
136 136 "field": "timestamp",
137 137 "interval": interval,
138 138 "min_doc_count": 0,
139 139 },
140 140 }
141 141 },
142 142 "query": {
143 "filtered": {
143 "bool": {
144 144 "filter": {
145 145 "and": [
146 146 {
147 147 "terms": {
148 148 "resource_id": [filter_settings["resource"][0]]
149 149 }
150 150 },
151 151 {
152 152 "range": {
153 153 "timestamp": {
154 154 "gte": filter_settings["start_date"],
155 155 "lte": filter_settings["end_date"],
156 156 }
157 157 }
158 158 },
159 159 {"terms": {"namespace": ["appenlight.request_metric"]}},
160 160 ]
161 161 }
162 162 }
163 163 },
164 164 }
165 165
166 166 index_names = es_index_name_limiter(
167 167 start_date=filter_settings["start_date"],
168 168 end_date=filter_settings["end_date"],
169 169 ixtypes=["metrics"],
170 170 )
171 171 if not index_names:
172 172 return []
173 173
174 174 result = Datastores.es.search(
175 175 body=es_query, index=index_names, doc_type="log", size=0
176 176 )
177 177
178 178 plot_data = []
179 179 for item in result["aggregations"]["parent_agg"]["buckets"]:
180 180 x_time = datetime.utcfromtimestamp(int(item["key"]) / 1000)
181 181 point = {"x": x_time}
182 182 for key in ["custom", "main", "nosql", "remote", "requests", "sql", "tmpl"]:
183 183 value = item[key]["sub_agg"]["value"]
184 184 point[key] = round(value, 3) if value else 0
185 185 plot_data.append(point)
186 186
187 187 return plot_data
188 188
189 189 @classmethod
190 190 def get_requests_breakdown(cls, request, filter_settings, db_session=None):
191 191 db_session = get_db_session(db_session)
192 192
193 193 # fetch total time of all requests in this time range
194 194 index_names = es_index_name_limiter(
195 195 start_date=filter_settings["start_date"],
196 196 end_date=filter_settings["end_date"],
197 197 ixtypes=["metrics"],
198 198 )
199 199
200 200 if index_names and filter_settings["resource"]:
201 201 es_query = {
202 202 "aggs": {
203 203 "main": {
204 204 "aggs": {
205 205 "sub_agg": {"sum": {"field": "tags.main.numeric_values"}}
206 206 },
207 207 "filter": {"exists": {"field": "tags.main.numeric_values"}},
208 208 }
209 209 },
210 210 "query": {
211 "filtered": {
211 "bool": {
212 212 "filter": {
213 213 "and": [
214 214 {
215 215 "terms": {
216 216 "resource_id": [filter_settings["resource"][0]]
217 217 }
218 218 },
219 219 {
220 220 "range": {
221 221 "timestamp": {
222 222 "gte": filter_settings["start_date"],
223 223 "lte": filter_settings["end_date"],
224 224 }
225 225 }
226 226 },
227 227 {"terms": {"namespace": ["appenlight.request_metric"]}},
228 228 ]
229 229 }
230 230 }
231 231 },
232 232 }
233 233 result = Datastores.es.search(
234 234 body=es_query, index=index_names, doc_type="log", size=0
235 235 )
236 236 total_time_spent = result["aggregations"]["main"]["sub_agg"]["value"]
237 237 else:
238 238 total_time_spent = 0
239 239 script_text = "doc['tags.main.numeric_values'].value / {}".format(
240 240 total_time_spent
241 241 )
242 242
243 243 if index_names and filter_settings["resource"]:
244 244 es_query = {
245 245 "aggs": {
246 246 "parent_agg": {
247 247 "aggs": {
248 248 "main": {
249 249 "aggs": {
250 250 "sub_agg": {
251 251 "sum": {"field": "tags.main.numeric_values"}
252 252 }
253 253 },
254 254 "filter": {
255 255 "exists": {"field": "tags.main.numeric_values"}
256 256 },
257 257 },
258 258 "percentage": {
259 259 "aggs": {
260 260 "sub_agg": {
261 261 "sum": {
262 262 "lang": "expression",
263 263 "script": script_text,
264 264 }
265 265 }
266 266 },
267 267 "filter": {
268 268 "exists": {"field": "tags.main.numeric_values"}
269 269 },
270 270 },
271 271 "requests": {
272 272 "aggs": {
273 273 "sub_agg": {
274 274 "sum": {"field": "tags.requests.numeric_values"}
275 275 }
276 276 },
277 277 "filter": {
278 278 "exists": {"field": "tags.requests.numeric_values"}
279 279 },
280 280 },
281 281 },
282 282 "terms": {
283 283 "field": "tags.view_name.values",
284 284 "order": {"percentage>sub_agg": "desc"},
285 285 "size": 15,
286 286 },
287 287 }
288 288 },
289 289 "query": {
290 "filtered": {
290 "bool": {
291 291 "filter": {
292 292 "and": [
293 293 {
294 294 "terms": {
295 295 "resource_id": [filter_settings["resource"][0]]
296 296 }
297 297 },
298 298 {
299 299 "range": {
300 300 "timestamp": {
301 301 "gte": filter_settings["start_date"],
302 302 "lte": filter_settings["end_date"],
303 303 }
304 304 }
305 305 },
306 306 ]
307 307 }
308 308 }
309 309 },
310 310 }
311 311 result = Datastores.es.search(
312 312 body=es_query, index=index_names, doc_type="log", size=0
313 313 )
314 314 series = result["aggregations"]["parent_agg"]["buckets"]
315 315 else:
316 316 series = []
317 317
318 318 and_part = [
319 319 {"term": {"resource_id": filter_settings["resource"][0]}},
320 320 {"terms": {"tags.view_name.values": [row["key"] for row in series]}},
321 321 {"term": {"report_type": str(ReportType.slow)}},
322 322 ]
323 323 query = {
324 324 "aggs": {
325 325 "top_reports": {
326 326 "terms": {"field": "tags.view_name.values", "size": len(series)},
327 327 "aggs": {
328 328 "top_calls_hits": {
329 329 "top_hits": {"sort": {"start_time": "desc"}, "size": 5}
330 330 }
331 331 },
332 332 }
333 333 },
334 "query": {"filtered": {"filter": {"and": and_part}}},
334 "query": {"bool": {"filter": {"and": and_part}}},
335 335 }
336 336 details = {}
337 337 index_names = es_index_name_limiter(ixtypes=["reports"])
338 338 if index_names and series:
339 339 result = Datastores.es.search(
340 340 body=query, doc_type="report", size=0, index=index_names
341 341 )
342 342 for bucket in result["aggregations"]["top_reports"]["buckets"]:
343 343 details[bucket["key"]] = []
344 344
345 345 for hit in bucket["top_calls_hits"]["hits"]["hits"]:
346 346 details[bucket["key"]].append(
347 347 {
348 348 "report_id": hit["_source"]["pg_id"],
349 349 "group_id": hit["_source"]["group_id"],
350 350 }
351 351 )
352 352
353 353 results = []
354 354 for row in series:
355 355 result = {
356 356 "key": row["key"],
357 357 "main": row["main"]["sub_agg"]["value"],
358 358 "requests": row["requests"]["sub_agg"]["value"],
359 359 }
360 360 # es can return 'infinity'
361 361 try:
362 362 result["percentage"] = float(row["percentage"]["sub_agg"]["value"])
363 363 except ValueError:
364 364 result["percentage"] = 0
365 365
366 366 result["latest_details"] = details.get(row["key"]) or []
367 367 results.append(result)
368 368
369 369 return results
370 370
371 371 @classmethod
372 372 def get_apdex_stats(cls, request, filter_settings, threshold=1, db_session=None):
373 373 """
374 374 Returns information and calculates APDEX score per server for dashboard
375 375 server information (upper right stats boxes)
376 376 """
377 377 # Apdex t = (Satisfied Count + Tolerated Count / 2) / Total Samples
378 378 db_session = get_db_session(db_session)
379 379 index_names = es_index_name_limiter(
380 380 start_date=filter_settings["start_date"],
381 381 end_date=filter_settings["end_date"],
382 382 ixtypes=["metrics"],
383 383 )
384 384
385 385 requests_series = []
386 386
387 387 if index_names and filter_settings["resource"]:
388 388 es_query = {
389 389 "aggs": {
390 390 "parent_agg": {
391 391 "aggs": {
392 392 "frustrating": {
393 393 "aggs": {
394 394 "sub_agg": {
395 395 "sum": {"field": "tags.requests.numeric_values"}
396 396 }
397 397 },
398 398 "filter": {
399 399 "and": [
400 400 {
401 401 "range": {
402 402 "tags.main.numeric_values": {"gte": "4"}
403 403 }
404 404 },
405 405 {
406 406 "exists": {
407 407 "field": "tags.requests.numeric_values"
408 408 }
409 409 },
410 410 ]
411 411 },
412 412 },
413 413 "main": {
414 414 "aggs": {
415 415 "sub_agg": {
416 416 "sum": {"field": "tags.main.numeric_values"}
417 417 }
418 418 },
419 419 "filter": {
420 420 "exists": {"field": "tags.main.numeric_values"}
421 421 },
422 422 },
423 423 "requests": {
424 424 "aggs": {
425 425 "sub_agg": {
426 426 "sum": {"field": "tags.requests.numeric_values"}
427 427 }
428 428 },
429 429 "filter": {
430 430 "exists": {"field": "tags.requests.numeric_values"}
431 431 },
432 432 },
433 433 "tolerated": {
434 434 "aggs": {
435 435 "sub_agg": {
436 436 "sum": {"field": "tags.requests.numeric_values"}
437 437 }
438 438 },
439 439 "filter": {
440 440 "and": [
441 441 {
442 442 "range": {
443 443 "tags.main.numeric_values": {"gte": "1"}
444 444 }
445 445 },
446 446 {
447 447 "range": {
448 448 "tags.main.numeric_values": {"lt": "4"}
449 449 }
450 450 },
451 451 {
452 452 "exists": {
453 453 "field": "tags.requests.numeric_values"
454 454 }
455 455 },
456 456 ]
457 457 },
458 458 },
459 459 },
460 460 "terms": {"field": "tags.server_name.values", "size": 999999},
461 461 }
462 462 },
463 463 "query": {
464 "filtered": {
464 "bool": {
465 465 "filter": {
466 466 "and": [
467 467 {
468 468 "terms": {
469 469 "resource_id": [filter_settings["resource"][0]]
470 470 }
471 471 },
472 472 {
473 473 "range": {
474 474 "timestamp": {
475 475 "gte": filter_settings["start_date"],
476 476 "lte": filter_settings["end_date"],
477 477 }
478 478 }
479 479 },
480 480 {"terms": {"namespace": ["appenlight.request_metric"]}},
481 481 ]
482 482 }
483 483 }
484 484 },
485 485 }
486 486
487 487 result = Datastores.es.search(
488 488 body=es_query, index=index_names, doc_type="log", size=0
489 489 )
490 490 for bucket in result["aggregations"]["parent_agg"]["buckets"]:
491 491 requests_series.append(
492 492 {
493 493 "frustrating": bucket["frustrating"]["sub_agg"]["value"],
494 494 "main": bucket["main"]["sub_agg"]["value"],
495 495 "requests": bucket["requests"]["sub_agg"]["value"],
496 496 "tolerated": bucket["tolerated"]["sub_agg"]["value"],
497 497 "key": bucket["key"],
498 498 }
499 499 )
500 500
501 501 since_when = filter_settings["start_date"]
502 502 until = filter_settings["end_date"]
503 503
504 504 # total errors
505 505
506 506 index_names = es_index_name_limiter(
507 507 start_date=filter_settings["start_date"],
508 508 end_date=filter_settings["end_date"],
509 509 ixtypes=["reports"],
510 510 )
511 511
512 512 report_series = []
513 513 if index_names and filter_settings["resource"]:
514 514 report_type = ReportType.key_from_value(ReportType.error)
515 515 es_query = {
516 516 "aggs": {
517 517 "parent_agg": {
518 518 "aggs": {
519 519 "errors": {
520 520 "aggs": {
521 521 "sub_agg": {
522 522 "sum": {
523 523 "field": "tags.occurences.numeric_values"
524 524 }
525 525 }
526 526 },
527 527 "filter": {
528 528 "and": [
529 529 {"terms": {"tags.type.values": [report_type]}},
530 530 {
531 531 "exists": {
532 532 "field": "tags.occurences.numeric_values"
533 533 }
534 534 },
535 535 ]
536 536 },
537 537 }
538 538 },
539 539 "terms": {"field": "tags.server_name.values", "size": 999999},
540 540 }
541 541 },
542 542 "query": {
543 "filtered": {
543 "bool": {
544 544 "filter": {
545 545 "and": [
546 546 {
547 547 "terms": {
548 548 "resource_id": [filter_settings["resource"][0]]
549 549 }
550 550 },
551 551 {
552 552 "range": {
553 553 "timestamp": {
554 554 "gte": filter_settings["start_date"],
555 555 "lte": filter_settings["end_date"],
556 556 }
557 557 }
558 558 },
559 559 {"terms": {"namespace": ["appenlight.error"]}},
560 560 ]
561 561 }
562 562 }
563 563 },
564 564 }
565 565 result = Datastores.es.search(
566 566 body=es_query, index=index_names, doc_type="log", size=0
567 567 )
568 568 for bucket in result["aggregations"]["parent_agg"]["buckets"]:
569 569 report_series.append(
570 570 {
571 571 "key": bucket["key"],
572 572 "errors": bucket["errors"]["sub_agg"]["value"],
573 573 }
574 574 )
575 575
576 576 stats = {}
577 577 if UptimeMetricService is not None:
578 578 uptime = UptimeMetricService.get_uptime_by_app(
579 579 filter_settings["resource"][0], since_when=since_when, until=until
580 580 )
581 581 else:
582 582 uptime = 0
583 583
584 584 total_seconds = (until - since_when).total_seconds()
585 585
586 586 for stat in requests_series:
587 587 check_key(stat["key"], stats, uptime, total_seconds)
588 588 stats[stat["key"]]["requests"] = int(stat["requests"])
589 589 stats[stat["key"]]["response_time"] = stat["main"]
590 590 stats[stat["key"]]["tolerated_requests"] = stat["tolerated"]
591 591 stats[stat["key"]]["frustrating_requests"] = stat["frustrating"]
592 592 for server in report_series:
593 593 check_key(server["key"], stats, uptime, total_seconds)
594 594 stats[server["key"]]["errors"] = server["errors"]
595 595
596 596 server_stats = list(stats.values())
597 597 for stat in server_stats:
598 598 stat["satisfying_requests"] = (
599 599 stat["requests"]
600 600 - stat["errors"]
601 601 - stat["frustrating_requests"]
602 602 - stat["tolerated_requests"]
603 603 )
604 604 if stat["satisfying_requests"] < 0:
605 605 stat["satisfying_requests"] = 0
606 606
607 607 if stat["requests"]:
608 608 stat["avg_response_time"] = round(
609 609 stat["response_time"] / stat["requests"], 3
610 610 )
611 611 qual_requests = (
612 612 stat["satisfying_requests"] + stat["tolerated_requests"] / 2.0
613 613 )
614 614 stat["apdex"] = round((qual_requests / stat["requests"]) * 100, 2)
615 615 stat["rpm"] = round(stat["requests"] / stat["total_minutes"], 2)
616 616
617 617 return sorted(server_stats, key=lambda x: x["name"])
@@ -1,186 +1,186 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors
4 4 #
5 5 # Licensed under the Apache License, Version 2.0 (the "License");
6 6 # you may not use this file except in compliance with the License.
7 7 # You may obtain a copy of the License at
8 8 #
9 9 # http://www.apache.org/licenses/LICENSE-2.0
10 10 #
11 11 # Unless required by applicable law or agreed to in writing, software
12 12 # distributed under the License is distributed on an "AS IS" BASIS,
13 13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 14 # See the License for the specific language governing permissions and
15 15 # limitations under the License.
16 16
17 17 from appenlight.models import get_db_session, Datastores
18 18 from appenlight.models.report import Report
19 19 from appenlight.models.services.base import BaseService
20 20 from appenlight.lib.utils import es_index_name_limiter
21 21
22 22
23 23 class SlowCallService(BaseService):
24 24 @classmethod
25 25 def get_time_consuming_calls(cls, request, filter_settings, db_session=None):
26 26 db_session = get_db_session(db_session)
27 27 # get slow calls from older partitions too
28 28 index_names = es_index_name_limiter(
29 29 start_date=filter_settings["start_date"],
30 30 end_date=filter_settings["end_date"],
31 31 ixtypes=["slow_calls"],
32 32 )
33 33 if index_names and filter_settings["resource"]:
34 34 # get longest time taking hashes
35 35 es_query = {
36 36 "aggs": {
37 37 "parent_agg": {
38 38 "aggs": {
39 39 "duration": {
40 40 "aggs": {
41 41 "sub_agg": {
42 42 "sum": {"field": "tags.duration.numeric_values"}
43 43 }
44 44 },
45 45 "filter": {
46 46 "exists": {"field": "tags.duration.numeric_values"}
47 47 },
48 48 },
49 49 "total": {
50 50 "aggs": {
51 51 "sub_agg": {
52 52 "value_count": {
53 53 "field": "tags.statement_hash.values"
54 54 }
55 55 }
56 56 },
57 57 "filter": {
58 58 "exists": {"field": "tags.statement_hash.values"}
59 59 },
60 60 },
61 61 },
62 62 "terms": {
63 63 "field": "tags.statement_hash.values",
64 64 "order": {"duration>sub_agg": "desc"},
65 65 "size": 15,
66 66 },
67 67 }
68 68 },
69 69 "query": {
70 "filtered": {
70 "bool": {
71 71 "filter": {
72 72 "and": [
73 73 {
74 74 "terms": {
75 75 "resource_id": [filter_settings["resource"][0]]
76 76 }
77 77 },
78 78 {
79 79 "range": {
80 80 "timestamp": {
81 81 "gte": filter_settings["start_date"],
82 82 "lte": filter_settings["end_date"],
83 83 }
84 84 }
85 85 },
86 86 ]
87 87 }
88 88 }
89 89 },
90 90 }
91 91 result = Datastores.es.search(
92 92 body=es_query, index=index_names, doc_type="log", size=0
93 93 )
94 94 results = result["aggregations"]["parent_agg"]["buckets"]
95 95 else:
96 96 return []
97 97 hashes = [i["key"] for i in results]
98 98
99 99 # get queries associated with hashes
100 100 calls_query = {
101 101 "aggs": {
102 102 "top_calls": {
103 103 "terms": {"field": "tags.statement_hash.values", "size": 15},
104 104 "aggs": {
105 105 "top_calls_hits": {
106 106 "top_hits": {"sort": {"timestamp": "desc"}, "size": 5}
107 107 }
108 108 },
109 109 }
110 110 },
111 111 "query": {
112 "filtered": {
112 "bool": {
113 113 "filter": {
114 114 "and": [
115 115 {
116 116 "terms": {
117 117 "resource_id": [filter_settings["resource"][0]]
118 118 }
119 119 },
120 120 {"terms": {"tags.statement_hash.values": hashes}},
121 121 {
122 122 "range": {
123 123 "timestamp": {
124 124 "gte": filter_settings["start_date"],
125 125 "lte": filter_settings["end_date"],
126 126 }
127 127 }
128 128 },
129 129 ]
130 130 }
131 131 }
132 132 },
133 133 }
134 134 calls = Datastores.es.search(
135 135 body=calls_query, index=index_names, doc_type="log", size=0
136 136 )
137 137 call_results = {}
138 138 report_ids = []
139 139 for call in calls["aggregations"]["top_calls"]["buckets"]:
140 140 hits = call["top_calls_hits"]["hits"]["hits"]
141 141 call_results[call["key"]] = [i["_source"] for i in hits]
142 142 report_ids.extend(
143 143 [i["_source"]["tags"]["report_id"]["values"] for i in hits]
144 144 )
145 145 if report_ids:
146 146 r_query = db_session.query(Report.group_id, Report.id)
147 147 r_query = r_query.filter(Report.id.in_(report_ids))
148 148 r_query = r_query.filter(Report.start_time >= filter_settings["start_date"])
149 149 else:
150 150 r_query = []
151 151 reports_reversed = {}
152 152 for report in r_query:
153 153 reports_reversed[report.id] = report.group_id
154 154
155 155 final_results = []
156 156 for item in results:
157 157 if item["key"] not in call_results:
158 158 continue
159 159 call = call_results[item["key"]][0]
160 160 row = {
161 161 "occurences": item["total"]["sub_agg"]["value"],
162 162 "total_duration": round(item["duration"]["sub_agg"]["value"]),
163 163 "statement": call["message"],
164 164 "statement_type": call["tags"]["type"]["values"],
165 165 "statement_subtype": call["tags"]["subtype"]["values"],
166 166 "statement_hash": item["key"],
167 167 "latest_details": [],
168 168 }
169 169 if row["statement_type"] in ["tmpl", " remote"]:
170 170 params = (
171 171 call["tags"]["parameters"]["values"]
172 172 if "parameters" in call["tags"]
173 173 else ""
174 174 )
175 175 row["statement"] = "{} ({})".format(call["message"], params)
176 176 for call in call_results[item["key"]]:
177 177 report_id = call["tags"]["report_id"]["values"]
178 178 group_id = reports_reversed.get(report_id)
179 179 if group_id:
180 180 row["latest_details"].append(
181 181 {"group_id": group_id, "report_id": report_id}
182 182 )
183 183
184 184 final_results.append(row)
185 185
186 186 return final_results
@@ -1,222 +1,222 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors
4 4 #
5 5 # Licensed under the Apache License, Version 2.0 (the "License");
6 6 # you may not use this file except in compliance with the License.
7 7 # You may obtain a copy of the License at
8 8 #
9 9 # http://www.apache.org/licenses/LICENSE-2.0
10 10 #
11 11 # Unless required by applicable law or agreed to in writing, software
12 12 # distributed under the License is distributed on an "AS IS" BASIS,
13 13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 14 # See the License for the specific language governing permissions and
15 15 # limitations under the License.
16 16
17 17 import logging
18 18 from datetime import datetime, timedelta
19 19
20 20 from pyramid.view import view_config
21 21 from pyramid.httpexceptions import HTTPUnprocessableEntity
22 22 from appenlight.models import Datastores, Log
23 23 from appenlight.models.services.log import LogService
24 24 from appenlight.lib.utils import (
25 25 build_filter_settings_from_query_dict,
26 26 es_index_name_limiter,
27 27 )
28 28 from appenlight.lib.helpers import gen_pagination_headers
29 29 from appenlight.celery.tasks import logs_cleanup
30 30
31 31 log = logging.getLogger(__name__)
32 32
33 33 section_filters_key = "appenlight:logs:filter:%s"
34 34
35 35
36 36 @view_config(route_name="logs_no_id", renderer="json", permission="authenticated")
37 37 def fetch_logs(request):
38 38 """
39 39 Returns list of log entries from Elasticsearch
40 40 """
41 41
42 42 filter_settings = build_filter_settings_from_query_dict(
43 43 request, request.GET.mixed()
44 44 )
45 45 logs_paginator = LogService.get_paginator_by_app_ids(
46 46 app_ids=filter_settings["resource"],
47 47 page=filter_settings["page"],
48 48 filter_settings=filter_settings,
49 49 )
50 50 headers = gen_pagination_headers(request, logs_paginator)
51 51 request.response.headers.update(headers)
52 52
53 53 return [l.get_dict() for l in logs_paginator.sa_items]
54 54
55 55
56 56 @view_config(
57 57 route_name="section_view",
58 58 match_param=["section=logs_section", "view=fetch_series"],
59 59 renderer="json",
60 60 permission="authenticated",
61 61 )
62 62 def logs_fetch_series(request):
63 63 """
64 64 Handles metric dashboard graphs
65 65 Returns information for time/tier breakdown
66 66 """
67 67 filter_settings = build_filter_settings_from_query_dict(
68 68 request, request.GET.mixed()
69 69 )
70 70 paginator = LogService.get_paginator_by_app_ids(
71 71 app_ids=filter_settings["resource"],
72 72 page=1,
73 73 filter_settings=filter_settings,
74 74 items_per_page=1,
75 75 )
76 76 now = datetime.utcnow().replace(microsecond=0, second=0)
77 77 delta = timedelta(days=7)
78 78 if paginator.sa_items:
79 79 start_date = paginator.sa_items[-1].timestamp.replace(microsecond=0, second=0)
80 80 filter_settings["start_date"] = start_date - delta
81 81 else:
82 82 filter_settings["start_date"] = now - delta
83 83 filter_settings["end_date"] = filter_settings["start_date"] + timedelta(days=7)
84 84
85 85 @request.registry.cache_regions.redis_sec_30.cache_on_arguments("logs_graphs")
86 86 def cached(apps, search_params, delta, now):
87 87 data = LogService.get_time_series_aggregate(
88 88 filter_settings["resource"], filter_settings
89 89 )
90 90 if not data:
91 91 return []
92 92 buckets = data["aggregations"]["events_over_time"]["buckets"]
93 93 return [
94 94 {
95 95 "x": datetime.utcfromtimestamp(item["key"] / 1000),
96 96 "logs": item["doc_count"],
97 97 }
98 98 for item in buckets
99 99 ]
100 100
101 101 return cached(filter_settings, request.GET.mixed(), delta, now)
102 102
103 103
104 104 @view_config(
105 105 route_name="logs_no_id",
106 106 renderer="json",
107 107 request_method="DELETE",
108 108 permission="authenticated",
109 109 )
110 110 def logs_mass_delete(request):
111 111 params = request.GET.mixed()
112 112 if "resource" not in params:
113 113 raise HTTPUnprocessableEntity()
114 114 # this might be '' and then colander will not validate the schema
115 115 if not params.get("namespace"):
116 116 params.pop("namespace", None)
117 117 filter_settings = build_filter_settings_from_query_dict(
118 118 request, params, resource_permissions=["update_reports"]
119 119 )
120 120
121 121 resource_id = list(filter_settings["resource"])[0]
122 122 # filter settings returns list of all of users applications
123 123 # if app is not matching - normally we would not care as its used for search
124 124 # but here user playing with params would possibly wipe out their whole data
125 125 if int(resource_id) != int(params["resource"]):
126 126 raise HTTPUnprocessableEntity()
127 127
128 128 logs_cleanup.delay(resource_id, filter_settings)
129 129 msg = (
130 130 "Log cleanup process started - it may take a while for "
131 131 "everything to get removed"
132 132 )
133 133 request.session.flash(msg)
134 134 return {}
135 135
136 136
137 137 @view_config(
138 138 route_name="section_view",
139 139 match_param=("view=common_tags", "section=logs_section"),
140 140 renderer="json",
141 141 permission="authenticated",
142 142 )
143 143 def common_tags(request):
144 144 config = request.GET.mixed()
145 145 filter_settings = build_filter_settings_from_query_dict(request, config)
146 146
147 147 resources = list(filter_settings["resource"])
148 148 query = {
149 149 "query": {
150 "filtered": {
150 "bool": {
151 151 "filter": {"and": [{"terms": {"resource_id": list(resources)}}]}
152 152 }
153 153 }
154 154 }
155 155 start_date = filter_settings.get("start_date")
156 156 end_date = filter_settings.get("end_date")
157 filter_part = query["query"]["filtered"]["filter"]["and"]
157 filter_part = query["query"]["bool"]["filter"]["and"]
158 158
159 159 date_range = {"range": {"timestamp": {}}}
160 160 if start_date:
161 161 date_range["range"]["timestamp"]["gte"] = start_date
162 162 if end_date:
163 163 date_range["range"]["timestamp"]["lte"] = end_date
164 164 if start_date or end_date:
165 165 filter_part.append(date_range)
166 166
167 167 levels = filter_settings.get("level")
168 168 if levels:
169 169 filter_part.append({"terms": {"log_level": levels}})
170 170 namespaces = filter_settings.get("namespace")
171 171 if namespaces:
172 172 filter_part.append({"terms": {"namespace": namespaces}})
173 173
174 174 query["aggs"] = {"sub_agg": {"terms": {"field": "tag_list", "size": 50}}}
175 175 # tags
176 176 index_names = es_index_name_limiter(ixtypes=[config.get("datasource", "logs")])
177 177 result = Datastores.es.search(body=query, index=index_names, doc_type="log", size=0)
178 178 tag_buckets = result["aggregations"]["sub_agg"].get("buckets", [])
179 179 # namespaces
180 180 query["aggs"] = {"sub_agg": {"terms": {"field": "namespace", "size": 50}}}
181 181 result = Datastores.es.search(body=query, index=index_names, doc_type="log", size=0)
182 182 namespaces_buckets = result["aggregations"]["sub_agg"].get("buckets", [])
183 183 return {
184 184 "tags": [item["key"] for item in tag_buckets],
185 185 "namespaces": [item["key"] for item in namespaces_buckets],
186 186 }
187 187
188 188
189 189 @view_config(
190 190 route_name="section_view",
191 191 match_param=("view=common_values", "section=logs_section"),
192 192 renderer="json",
193 193 permission="authenticated",
194 194 )
195 195 def common_values(request):
196 196 config = request.GET.mixed()
197 197 datasource = config.pop("datasource", "logs")
198 198 filter_settings = build_filter_settings_from_query_dict(request, config)
199 199 resources = list(filter_settings["resource"])
200 200 tag_name = filter_settings["tags"][0]["value"][0]
201 201
202 202 and_part = [
203 203 {"terms": {"resource_id": list(resources)}},
204 204 ]
205 205 if filter_settings["namespace"]:
206 206 and_part.append({"terms": {"namespace": filter_settings["namespace"]}})
207 207 query = {
208 208 "query": {
209 "filtered": {
209 "bool": {
210 210 "filter": {
211 211 "and": and_part
212 212 }
213 213 }
214 214 }
215 215 }
216 216 query["aggs"] = {
217 217 "sub_agg": {"terms": {"field": "tags.{}.values".format(tag_name), "size": 50}}
218 218 }
219 219 index_names = es_index_name_limiter(ixtypes=[datasource])
220 220 result = Datastores.es.search(body=query, index=index_names, doc_type="log", size=0)
221 221 values_buckets = result["aggregations"]["sub_agg"].get("buckets", [])
222 222 return {"values": [item["key"] for item in values_buckets]}
General Comments 0
You need to be logged in to leave comments. Login now