##// END OF EJS Templates
elasticsearch: move to single doctype indices
ergo -
Show More
@@ -1,707 +1,707 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors
4 4 #
5 5 # Licensed under the Apache License, Version 2.0 (the "License");
6 6 # you may not use this file except in compliance with the License.
7 7 # You may obtain a copy of the License at
8 8 #
9 9 # http://www.apache.org/licenses/LICENSE-2.0
10 10 #
11 11 # Unless required by applicable law or agreed to in writing, software
12 12 # distributed under the License is distributed on an "AS IS" BASIS,
13 13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 14 # See the License for the specific language governing permissions and
15 15 # limitations under the License.
16 16
17 17 import bisect
18 18 import collections
19 19 import math
20 20 from datetime import datetime, timedelta
21 21
22 22 import sqlalchemy as sa
23 23 import elasticsearch.exceptions
24 24 import elasticsearch.helpers
25 25
26 26 from celery.utils.log import get_task_logger
27 27 from zope.sqlalchemy import mark_changed
28 28 from pyramid.threadlocal import get_current_request, get_current_registry
29 29 from ziggurat_foundations.models.services.resource import ResourceService
30 30
31 31 from appenlight.celery import celery
32 32 from appenlight.models.report_group import ReportGroup
33 33 from appenlight.models import DBSession, Datastores
34 34 from appenlight.models.report import Report
35 35 from appenlight.models.log import Log
36 36 from appenlight.models.metric import Metric
37 37 from appenlight.models.event import Event
38 38
39 39 from appenlight.models.services.application import ApplicationService
40 40 from appenlight.models.services.event import EventService
41 41 from appenlight.models.services.log import LogService
42 42 from appenlight.models.services.report import ReportService
43 43 from appenlight.models.services.report_group import ReportGroupService
44 44 from appenlight.models.services.user import UserService
45 45 from appenlight.models.tag import Tag
46 46 from appenlight.lib import print_traceback
47 47 from appenlight.lib.utils import parse_proto, in_batches
48 48 from appenlight.lib.ext_json import json
49 49 from appenlight.lib.redis_keys import REDIS_KEYS
50 50 from appenlight.lib.enums import ReportType
51 51
52 52 log = get_task_logger(__name__)
53 53
54 54 sample_boundries = (
55 55 list(range(100, 1000, 100))
56 56 + list(range(1000, 10000, 1000))
57 57 + list(range(10000, 100000, 5000))
58 58 )
59 59
60 60
61 61 def pick_sample(total_occurences, report_type=None):
62 62 every = 1.0
63 63 position = bisect.bisect_left(sample_boundries, total_occurences)
64 64 if position > 0:
65 65 if report_type == ReportType.not_found:
66 66 divide = 10.0
67 67 else:
68 68 divide = 100.0
69 69 every = sample_boundries[position - 1] / divide
70 70 return total_occurences % every == 0
71 71
72 72
73 73 @celery.task(queue="default", default_retry_delay=1, max_retries=2)
74 74 def test_exception_task():
75 75 log.error("test celery log", extra={"location": "celery"})
76 76 log.warning("test celery log", extra={"location": "celery"})
77 77 raise Exception("Celery exception test")
78 78
79 79
80 80 @celery.task(queue="default", default_retry_delay=1, max_retries=2)
81 81 def test_retry_exception_task():
82 82 try:
83 83 import time
84 84
85 85 time.sleep(1.3)
86 86 log.error("test retry celery log", extra={"location": "celery"})
87 87 log.warning("test retry celery log", extra={"location": "celery"})
88 88 raise Exception("Celery exception test")
89 89 except Exception as exc:
90 90 if celery.conf["CELERY_EAGER_PROPAGATES_EXCEPTIONS"]:
91 91 raise
92 92 test_retry_exception_task.retry(exc=exc)
93 93
94 94
95 95 @celery.task(queue="reports", default_retry_delay=600, max_retries=144)
96 96 def add_reports(resource_id, request_params, dataset, **kwargs):
97 97 proto_version = parse_proto(request_params.get("protocol_version", ""))
98 98 current_time = datetime.utcnow().replace(second=0, microsecond=0)
99 99 try:
100 100 # we will store solr docs here for single insert
101 101 es_report_docs = {}
102 102 es_report_group_docs = {}
103 103 resource = ApplicationService.by_id(resource_id)
104 104
105 105 tags = []
106 106 es_slow_calls_docs = {}
107 107 es_reports_stats_rows = {}
108 108 for report_data in dataset:
109 109 # build report details for later
110 110 added_details = 0
111 111 report = Report()
112 112 report.set_data(report_data, resource, proto_version)
113 113 report._skip_ft_index = True
114 114
115 115 # find latest group in this months partition
116 116 report_group = ReportGroupService.by_hash_and_resource(
117 117 report.resource_id,
118 118 report.grouping_hash,
119 119 since_when=datetime.utcnow().date().replace(day=1),
120 120 )
121 121 occurences = report_data.get("occurences", 1)
122 122 if not report_group:
123 123 # total reports will be +1 moment later
124 124 report_group = ReportGroup(
125 125 grouping_hash=report.grouping_hash,
126 126 occurences=0,
127 127 total_reports=0,
128 128 last_report=0,
129 129 priority=report.priority,
130 130 error=report.error,
131 131 first_timestamp=report.start_time,
132 132 )
133 133 report_group._skip_ft_index = True
134 134 report_group.report_type = report.report_type
135 135 report.report_group_time = report_group.first_timestamp
136 136 add_sample = pick_sample(
137 137 report_group.occurences, report_type=report_group.report_type
138 138 )
139 139 if add_sample:
140 140 resource.report_groups.append(report_group)
141 141 report_group.reports.append(report)
142 142 added_details += 1
143 143 DBSession.flush()
144 144 if report.partition_id not in es_report_docs:
145 145 es_report_docs[report.partition_id] = []
146 146 es_report_docs[report.partition_id].append(report.es_doc())
147 147 tags.extend(list(report.tags.items()))
148 148 slow_calls = report.add_slow_calls(report_data, report_group)
149 149 DBSession.flush()
150 150 for s_call in slow_calls:
151 151 if s_call.partition_id not in es_slow_calls_docs:
152 152 es_slow_calls_docs[s_call.partition_id] = []
153 153 es_slow_calls_docs[s_call.partition_id].append(s_call.es_doc())
154 154 # try generating new stat rows if needed
155 155 else:
156 156 # required for postprocessing to not fail later
157 157 report.report_group = report_group
158 158
159 159 stat_row = ReportService.generate_stat_rows(report, resource, report_group)
160 160 if stat_row.partition_id not in es_reports_stats_rows:
161 161 es_reports_stats_rows[stat_row.partition_id] = []
162 162 es_reports_stats_rows[stat_row.partition_id].append(stat_row.es_doc())
163 163
164 164 # see if we should mark 10th occurence of report
165 165 last_occurences_10 = int(math.floor(report_group.occurences / 10))
166 166 curr_occurences_10 = int(
167 167 math.floor((report_group.occurences + report.occurences) / 10)
168 168 )
169 169 last_occurences_100 = int(math.floor(report_group.occurences / 100))
170 170 curr_occurences_100 = int(
171 171 math.floor((report_group.occurences + report.occurences) / 100)
172 172 )
173 173 notify_occurences_10 = last_occurences_10 != curr_occurences_10
174 174 notify_occurences_100 = last_occurences_100 != curr_occurences_100
175 175 report_group.occurences = ReportGroup.occurences + occurences
176 176 report_group.last_timestamp = report.start_time
177 177 report_group.summed_duration = ReportGroup.summed_duration + report.duration
178 178 summed_duration = ReportGroup.summed_duration + report.duration
179 179 summed_occurences = ReportGroup.occurences + occurences
180 180 report_group.average_duration = summed_duration / summed_occurences
181 181 report_group.run_postprocessing(report)
182 182 if added_details:
183 183 report_group.total_reports = ReportGroup.total_reports + 1
184 184 report_group.last_report = report.id
185 185 report_group.set_notification_info(
186 186 notify_10=notify_occurences_10, notify_100=notify_occurences_100
187 187 )
188 188 DBSession.flush()
189 189 report_group.get_report().notify_channel(report_group)
190 190 if report_group.partition_id not in es_report_group_docs:
191 191 es_report_group_docs[report_group.partition_id] = []
192 192 es_report_group_docs[report_group.partition_id].append(
193 193 report_group.es_doc()
194 194 )
195 195
196 196 action = "REPORT"
197 197 log_msg = "%s: %s %s, client: %s, proto: %s" % (
198 198 action,
199 199 report_data.get("http_status", "unknown"),
200 200 str(resource),
201 201 report_data.get("client"),
202 202 proto_version,
203 203 )
204 204 log.info(log_msg)
205 205 total_reports = len(dataset)
206 206 redis_pipeline = Datastores.redis.pipeline(transaction=False)
207 207 key = REDIS_KEYS["counters"]["reports_per_minute"].format(current_time)
208 208 redis_pipeline.incr(key, total_reports)
209 209 redis_pipeline.expire(key, 3600 * 24)
210 210 key = REDIS_KEYS["counters"]["events_per_minute_per_user"].format(
211 211 resource.owner_user_id, current_time
212 212 )
213 213 redis_pipeline.incr(key, total_reports)
214 214 redis_pipeline.expire(key, 3600)
215 215 key = REDIS_KEYS["counters"]["reports_per_hour_per_app"].format(
216 216 resource_id, current_time.replace(minute=0)
217 217 )
218 218 redis_pipeline.incr(key, total_reports)
219 219 redis_pipeline.expire(key, 3600 * 24 * 7)
220 220 redis_pipeline.sadd(
221 221 REDIS_KEYS["apps_that_got_new_data_per_hour"].format(
222 222 current_time.replace(minute=0)
223 223 ),
224 224 resource_id,
225 225 )
226 226 redis_pipeline.execute()
227 227
228 228 add_reports_es(es_report_group_docs, es_report_docs)
229 229 add_reports_slow_calls_es(es_slow_calls_docs)
230 230 add_reports_stats_rows_es(es_reports_stats_rows)
231 231 return True
232 232 except Exception as exc:
233 233 print_traceback(log)
234 234 if celery.conf["CELERY_EAGER_PROPAGATES_EXCEPTIONS"]:
235 235 raise
236 236 add_reports.retry(exc=exc)
237 237
238 238
239 239 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
240 240 def add_reports_es(report_group_docs, report_docs):
241 241 for k, v in report_group_docs.items():
242 to_update = {"_index": k, "_type": "report_group"}
242 to_update = {"_index": k, "_type": "report"}
243 243 [i.update(to_update) for i in v]
244 244 elasticsearch.helpers.bulk(Datastores.es, v)
245 245 for k, v in report_docs.items():
246 246 to_update = {"_index": k, "_type": "report"}
247 247 [i.update(to_update) for i in v]
248 248 elasticsearch.helpers.bulk(Datastores.es, v)
249 249
250 250
251 251 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
252 252 def add_reports_slow_calls_es(es_docs):
253 253 for k, v in es_docs.items():
254 254 to_update = {"_index": k, "_type": "log"}
255 255 [i.update(to_update) for i in v]
256 256 elasticsearch.helpers.bulk(Datastores.es, v)
257 257
258 258
259 259 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
260 260 def add_reports_stats_rows_es(es_docs):
261 261 for k, v in es_docs.items():
262 to_update = {"_index": k, "_type": "log"}
262 to_update = {"_index": k, "_type": "report"}
263 263 [i.update(to_update) for i in v]
264 264 elasticsearch.helpers.bulk(Datastores.es, v)
265 265
266 266
267 267 @celery.task(queue="logs", default_retry_delay=600, max_retries=144)
268 268 def add_logs(resource_id, request_params, dataset, **kwargs):
269 269 proto_version = request_params.get("protocol_version")
270 270 current_time = datetime.utcnow().replace(second=0, microsecond=0)
271 271
272 272 try:
273 273 es_docs = collections.defaultdict(list)
274 274 resource = ApplicationService.by_id_cached()(resource_id)
275 275 resource = DBSession.merge(resource, load=False)
276 276 ns_pairs = []
277 277 for entry in dataset:
278 278 # gather pk and ns so we can remove older versions of row later
279 279 if entry["primary_key"] is not None:
280 280 ns_pairs.append({"pk": entry["primary_key"], "ns": entry["namespace"]})
281 281 log_entry = Log()
282 282 log_entry.set_data(entry, resource=resource)
283 283 log_entry._skip_ft_index = True
284 284 resource.logs.append(log_entry)
285 285 DBSession.flush()
286 286 # insert non pk rows first
287 287 if entry["primary_key"] is None:
288 288 es_docs[log_entry.partition_id].append(log_entry.es_doc())
289 289
290 # 2nd pass to delete all log entries from db foe same pk/ns pair
290 # 2nd pass to delete all log entries from db for same pk/ns pair
291 291 if ns_pairs:
292 292 ids_to_delete = []
293 293 es_docs = collections.defaultdict(list)
294 294 es_docs_to_delete = collections.defaultdict(list)
295 295 found_pkey_logs = LogService.query_by_primary_key_and_namespace(
296 296 list_of_pairs=ns_pairs
297 297 )
298 298 log_dict = {}
299 299 for log_entry in found_pkey_logs:
300 300 log_key = (log_entry.primary_key, log_entry.namespace)
301 301 if log_key not in log_dict:
302 302 log_dict[log_key] = []
303 303 log_dict[log_key].append(log_entry)
304 304
305 305 for ns, entry_list in log_dict.items():
306 306 entry_list = sorted(entry_list, key=lambda x: x.timestamp)
307 307 # newest row needs to be indexed in es
308 308 log_entry = entry_list[-1]
309 309 # delete everything from pg and ES, leave the last row in pg
310 310 for e in entry_list[:-1]:
311 311 ids_to_delete.append(e.log_id)
312 312 es_docs_to_delete[e.partition_id].append(e.delete_hash)
313 313
314 314 es_docs_to_delete[log_entry.partition_id].append(log_entry.delete_hash)
315 315
316 316 es_docs[log_entry.partition_id].append(log_entry.es_doc())
317 317
318 318 if ids_to_delete:
319 319 query = DBSession.query(Log).filter(Log.log_id.in_(ids_to_delete))
320 320 query.delete(synchronize_session=False)
321 321 if es_docs_to_delete:
322 322 # batch this to avoid problems with default ES bulk limits
323 323 for es_index in es_docs_to_delete.keys():
324 324 for batch in in_batches(es_docs_to_delete[es_index], 20):
325 325 query = {"query": {"terms": {"delete_hash": batch}}}
326 326
327 327 try:
328 328 Datastores.es.delete_by_query(
329 329 index=es_index, doc_type="log",
330 330 body=query, conflicts="proceed"
331 331 )
332 332 except elasticsearch.exceptions.NotFoundError as exc:
333 333 msg = "skipping index {}".format(es_index)
334 334 log.info(msg)
335 335
336 336 total_logs = len(dataset)
337 337
338 338 log_msg = "LOG_NEW: %s, entries: %s, proto:%s" % (
339 339 str(resource),
340 340 total_logs,
341 341 proto_version,
342 342 )
343 343 log.info(log_msg)
344 344 # mark_changed(session)
345 345 redis_pipeline = Datastores.redis.pipeline(transaction=False)
346 346 key = REDIS_KEYS["counters"]["logs_per_minute"].format(current_time)
347 347 redis_pipeline.incr(key, total_logs)
348 348 redis_pipeline.expire(key, 3600 * 24)
349 349 key = REDIS_KEYS["counters"]["events_per_minute_per_user"].format(
350 350 resource.owner_user_id, current_time
351 351 )
352 352 redis_pipeline.incr(key, total_logs)
353 353 redis_pipeline.expire(key, 3600)
354 354 key = REDIS_KEYS["counters"]["logs_per_hour_per_app"].format(
355 355 resource_id, current_time.replace(minute=0)
356 356 )
357 357 redis_pipeline.incr(key, total_logs)
358 358 redis_pipeline.expire(key, 3600 * 24 * 7)
359 359 redis_pipeline.sadd(
360 360 REDIS_KEYS["apps_that_got_new_data_per_hour"].format(
361 361 current_time.replace(minute=0)
362 362 ),
363 363 resource_id,
364 364 )
365 365 redis_pipeline.execute()
366 366 add_logs_es(es_docs)
367 367 return True
368 368 except Exception as exc:
369 369 print_traceback(log)
370 370 if celery.conf["CELERY_EAGER_PROPAGATES_EXCEPTIONS"]:
371 371 raise
372 372 add_logs.retry(exc=exc)
373 373
374 374
375 375 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
376 376 def add_logs_es(es_docs):
377 377 for k, v in es_docs.items():
378 378 to_update = {"_index": k, "_type": "log"}
379 379 [i.update(to_update) for i in v]
380 380 elasticsearch.helpers.bulk(Datastores.es, v)
381 381
382 382
383 383 @celery.task(queue="metrics", default_retry_delay=600, max_retries=144)
384 384 def add_metrics(resource_id, request_params, dataset, proto_version):
385 385 current_time = datetime.utcnow().replace(second=0, microsecond=0)
386 386 try:
387 387 resource = ApplicationService.by_id_cached()(resource_id)
388 388 resource = DBSession.merge(resource, load=False)
389 389 es_docs = []
390 390 rows = []
391 391 for metric in dataset:
392 392 tags = dict(metric["tags"])
393 393 server_n = tags.get("server_name", metric["server_name"]).lower()
394 394 tags["server_name"] = server_n or "unknown"
395 395 new_metric = Metric(
396 396 timestamp=metric["timestamp"],
397 397 resource_id=resource.resource_id,
398 398 namespace=metric["namespace"],
399 399 tags=tags,
400 400 )
401 401 rows.append(new_metric)
402 402 es_docs.append(new_metric.es_doc())
403 403 session = DBSession()
404 404 session.bulk_save_objects(rows)
405 405 session.flush()
406 406
407 407 action = "METRICS"
408 408 metrics_msg = "%s: %s, metrics: %s, proto:%s" % (
409 409 action,
410 410 str(resource),
411 411 len(dataset),
412 412 proto_version,
413 413 )
414 414 log.info(metrics_msg)
415 415
416 416 mark_changed(session)
417 417 redis_pipeline = Datastores.redis.pipeline(transaction=False)
418 418 key = REDIS_KEYS["counters"]["metrics_per_minute"].format(current_time)
419 419 redis_pipeline.incr(key, len(rows))
420 420 redis_pipeline.expire(key, 3600 * 24)
421 421 key = REDIS_KEYS["counters"]["events_per_minute_per_user"].format(
422 422 resource.owner_user_id, current_time
423 423 )
424 424 redis_pipeline.incr(key, len(rows))
425 425 redis_pipeline.expire(key, 3600)
426 426 key = REDIS_KEYS["counters"]["metrics_per_hour_per_app"].format(
427 427 resource_id, current_time.replace(minute=0)
428 428 )
429 429 redis_pipeline.incr(key, len(rows))
430 430 redis_pipeline.expire(key, 3600 * 24 * 7)
431 431 redis_pipeline.sadd(
432 432 REDIS_KEYS["apps_that_got_new_data_per_hour"].format(
433 433 current_time.replace(minute=0)
434 434 ),
435 435 resource_id,
436 436 )
437 437 redis_pipeline.execute()
438 438 add_metrics_es(es_docs)
439 439 return True
440 440 except Exception as exc:
441 441 print_traceback(log)
442 442 if celery.conf["CELERY_EAGER_PROPAGATES_EXCEPTIONS"]:
443 443 raise
444 444 add_metrics.retry(exc=exc)
445 445
446 446
447 447 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
448 448 def add_metrics_es(es_docs):
449 449 for doc in es_docs:
450 450 partition = "rcae_m_%s" % doc["timestamp"].strftime("%Y_%m_%d")
451 451 Datastores.es.index(partition, "log", doc)
452 452
453 453
454 454 @celery.task(queue="default", default_retry_delay=5, max_retries=2)
455 455 def check_user_report_notifications(resource_id):
456 456 since_when = datetime.utcnow()
457 457 try:
458 458 request = get_current_request()
459 459 application = ApplicationService.by_id(resource_id)
460 460 if not application:
461 461 return
462 462 error_key = REDIS_KEYS["reports_to_notify_per_type_per_app"].format(
463 463 ReportType.error, resource_id
464 464 )
465 465 slow_key = REDIS_KEYS["reports_to_notify_per_type_per_app"].format(
466 466 ReportType.slow, resource_id
467 467 )
468 468 error_group_ids = Datastores.redis.smembers(error_key)
469 469 slow_group_ids = Datastores.redis.smembers(slow_key)
470 470 Datastores.redis.delete(error_key)
471 471 Datastores.redis.delete(slow_key)
472 472 err_gids = [int(g_id) for g_id in error_group_ids]
473 473 slow_gids = [int(g_id) for g_id in list(slow_group_ids)]
474 474 group_ids = err_gids + slow_gids
475 475 occurence_dict = {}
476 476 for g_id in group_ids:
477 477 key = REDIS_KEYS["counters"]["report_group_occurences"].format(g_id)
478 478 val = Datastores.redis.get(key)
479 479 Datastores.redis.delete(key)
480 480 if val:
481 481 occurence_dict[g_id] = int(val)
482 482 else:
483 483 occurence_dict[g_id] = 1
484 484 report_groups = ReportGroupService.by_ids(group_ids)
485 485 report_groups.options(sa.orm.joinedload(ReportGroup.last_report_ref))
486 486
487 487 ApplicationService.check_for_groups_alert(
488 488 application,
489 489 "alert",
490 490 report_groups=report_groups,
491 491 occurence_dict=occurence_dict,
492 492 )
493 493 users = set(
494 494 [p.user for p in ResourceService.users_for_perm(application, "view")]
495 495 )
496 496 report_groups = report_groups.all()
497 497 for user in users:
498 498 UserService.report_notify(
499 499 user,
500 500 request,
501 501 application,
502 502 report_groups=report_groups,
503 503 occurence_dict=occurence_dict,
504 504 )
505 505 for group in report_groups:
506 506 # marks report_groups as notified
507 507 if not group.notified:
508 508 group.notified = True
509 509 except Exception as exc:
510 510 print_traceback(log)
511 511 raise
512 512
513 513
514 514 @celery.task(queue="default", default_retry_delay=5, max_retries=2)
515 515 def check_alerts(resource_id):
516 516 since_when = datetime.utcnow()
517 517 try:
518 518 request = get_current_request()
519 519 application = ApplicationService.by_id(resource_id)
520 520 if not application:
521 521 return
522 522 error_key = REDIS_KEYS["reports_to_notify_per_type_per_app_alerting"].format(
523 523 ReportType.error, resource_id
524 524 )
525 525 slow_key = REDIS_KEYS["reports_to_notify_per_type_per_app_alerting"].format(
526 526 ReportType.slow, resource_id
527 527 )
528 528 error_group_ids = Datastores.redis.smembers(error_key)
529 529 slow_group_ids = Datastores.redis.smembers(slow_key)
530 530 Datastores.redis.delete(error_key)
531 531 Datastores.redis.delete(slow_key)
532 532 err_gids = [int(g_id) for g_id in error_group_ids]
533 533 slow_gids = [int(g_id) for g_id in list(slow_group_ids)]
534 534 group_ids = err_gids + slow_gids
535 535 occurence_dict = {}
536 536 for g_id in group_ids:
537 537 key = REDIS_KEYS["counters"]["report_group_occurences_alerting"].format(
538 538 g_id
539 539 )
540 540 val = Datastores.redis.get(key)
541 541 Datastores.redis.delete(key)
542 542 if val:
543 543 occurence_dict[g_id] = int(val)
544 544 else:
545 545 occurence_dict[g_id] = 1
546 546 report_groups = ReportGroupService.by_ids(group_ids)
547 547 report_groups.options(sa.orm.joinedload(ReportGroup.last_report_ref))
548 548
549 549 ApplicationService.check_for_groups_alert(
550 550 application,
551 551 "alert",
552 552 report_groups=report_groups,
553 553 occurence_dict=occurence_dict,
554 554 since_when=since_when,
555 555 )
556 556 except Exception as exc:
557 557 print_traceback(log)
558 558 raise
559 559
560 560
561 561 @celery.task(queue="default", default_retry_delay=1, max_retries=2)
562 562 def close_alerts():
563 563 log.warning("Checking alerts")
564 564 since_when = datetime.utcnow()
565 565 try:
566 566 event_types = [
567 567 Event.types["error_report_alert"],
568 568 Event.types["slow_report_alert"],
569 569 ]
570 570 statuses = [Event.statuses["active"]]
571 571 # get events older than 5 min
572 572 events = EventService.by_type_and_status(
573 573 event_types, statuses, older_than=(since_when - timedelta(minutes=5))
574 574 )
575 575 for event in events:
576 576 # see if we can close them
577 577 event.validate_or_close(since_when=(since_when - timedelta(minutes=1)))
578 578 except Exception as exc:
579 579 print_traceback(log)
580 580 raise
581 581
582 582
583 583 @celery.task(queue="default", default_retry_delay=600, max_retries=144)
584 584 def update_tag_counter(tag_name, tag_value, count):
585 585 try:
586 586 query = (
587 587 DBSession.query(Tag)
588 588 .filter(Tag.name == tag_name)
589 589 .filter(
590 590 sa.cast(Tag.value, sa.types.TEXT)
591 591 == sa.cast(json.dumps(tag_value), sa.types.TEXT)
592 592 )
593 593 )
594 594 query.update(
595 595 {"times_seen": Tag.times_seen + count, "last_timestamp": datetime.utcnow()},
596 596 synchronize_session=False,
597 597 )
598 598 session = DBSession()
599 599 mark_changed(session)
600 600 return True
601 601 except Exception as exc:
602 602 print_traceback(log)
603 603 if celery.conf["CELERY_EAGER_PROPAGATES_EXCEPTIONS"]:
604 604 raise
605 605 update_tag_counter.retry(exc=exc)
606 606
607 607
608 608 @celery.task(queue="default")
609 609 def update_tag_counters():
610 610 """
611 611 Sets task to update counters for application tags
612 612 """
613 613 tags = Datastores.redis.lrange(REDIS_KEYS["seen_tag_list"], 0, -1)
614 614 Datastores.redis.delete(REDIS_KEYS["seen_tag_list"])
615 615 c = collections.Counter(tags)
616 616 for t_json, count in c.items():
617 617 tag_info = json.loads(t_json)
618 618 update_tag_counter.delay(tag_info[0], tag_info[1], count)
619 619
620 620
621 621 @celery.task(queue="default")
622 622 def daily_digest():
623 623 """
624 624 Sends daily digest with top 50 error reports
625 625 """
626 626 request = get_current_request()
627 627 apps = Datastores.redis.smembers(REDIS_KEYS["apps_that_had_reports"])
628 628 Datastores.redis.delete(REDIS_KEYS["apps_that_had_reports"])
629 629 since_when = datetime.utcnow() - timedelta(hours=8)
630 630 log.warning("Generating daily digests")
631 631 for resource_id in apps:
632 632 resource_id = resource_id.decode("utf8")
633 633 end_date = datetime.utcnow().replace(microsecond=0, second=0)
634 634 filter_settings = {
635 635 "resource": [resource_id],
636 636 "tags": [{"name": "type", "value": ["error"], "op": None}],
637 637 "type": "error",
638 638 "start_date": since_when,
639 639 "end_date": end_date,
640 640 }
641 641
642 642 reports = ReportGroupService.get_trending(
643 643 request, filter_settings=filter_settings, limit=50
644 644 )
645 645
646 646 application = ApplicationService.by_id(resource_id)
647 647 if application:
648 648 users = set(
649 649 [p.user for p in ResourceService.users_for_perm(application, "view")]
650 650 )
651 651 for user in users:
652 652 user.send_digest(
653 653 request, application, reports=reports, since_when=since_when
654 654 )
655 655
656 656
657 657 @celery.task(queue="default")
658 658 def notifications_reports():
659 659 """
660 660 Loop that checks redis for info and then issues new tasks to celery to
661 661 issue notifications
662 662 """
663 663 apps = Datastores.redis.smembers(REDIS_KEYS["apps_that_had_reports"])
664 664 Datastores.redis.delete(REDIS_KEYS["apps_that_had_reports"])
665 665 for app in apps:
666 666 log.warning("Notify for app: %s" % app)
667 667 check_user_report_notifications.delay(app.decode("utf8"))
668 668
669 669
670 670 @celery.task(queue="default")
671 671 def alerting_reports():
672 672 """
673 673 Loop that checks redis for info and then issues new tasks to celery to
674 674 perform the following:
675 675 - which applications should have new alerts opened
676 676 """
677 677
678 678 apps = Datastores.redis.smembers(REDIS_KEYS["apps_that_had_reports_alerting"])
679 679 Datastores.redis.delete(REDIS_KEYS["apps_that_had_reports_alerting"])
680 680 for app in apps:
681 681 log.warning("Notify for app: %s" % app)
682 682 check_alerts.delay(app.decode("utf8"))
683 683
684 684
685 685 @celery.task(
686 686 queue="default", soft_time_limit=3600 * 4, hard_time_limit=3600 * 4, max_retries=144
687 687 )
688 688 def logs_cleanup(resource_id, filter_settings):
689 689 request = get_current_request()
690 690 request.tm.begin()
691 691 es_query = {
692 692 "query": {
693 693 "bool": {"filter": [{"term": {"resource_id": resource_id}}]}
694 694 }
695 695 }
696 696
697 697 query = DBSession.query(Log).filter(Log.resource_id == resource_id)
698 698 if filter_settings["namespace"]:
699 699 query = query.filter(Log.namespace == filter_settings["namespace"][0])
700 700 es_query["query"]["bool"]["filter"].append(
701 701 {"term": {"namespace": filter_settings["namespace"][0]}}
702 702 )
703 703 query.delete(synchronize_session=False)
704 704 request.tm.commit()
705 705 Datastores.es.delete_by_query(
706 706 index="rcae_l_*", doc_type="log", body=es_query, conflicts="proceed"
707 707 )
@@ -1,132 +1,132 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors
4 4 #
5 5 # Licensed under the Apache License, Version 2.0 (the "License");
6 6 # you may not use this file except in compliance with the License.
7 7 # You may obtain a copy of the License at
8 8 #
9 9 # http://www.apache.org/licenses/LICENSE-2.0
10 10 #
11 11 # Unless required by applicable law or agreed to in writing, software
12 12 # distributed under the License is distributed on an "AS IS" BASIS,
13 13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 14 # See the License for the specific language governing permissions and
15 15 # limitations under the License.
16 16
17 17 import sqlalchemy as sa
18 18 import logging
19 19 import hashlib
20 20
21 21 from datetime import datetime
22 22 from appenlight.models import Base
23 23 from appenlight.lib.utils import convert_es_type
24 24 from appenlight.lib.enums import LogLevel
25 25 from sqlalchemy.dialects.postgresql import JSON
26 26 from ziggurat_foundations.models.base import BaseModel
27 27
28 28 log = logging.getLogger(__name__)
29 29
30 30
31 31 class Log(Base, BaseModel):
32 32 __tablename__ = "logs"
33 33 __table_args__ = {"implicit_returning": False}
34 34
35 35 log_id = sa.Column(sa.BigInteger(), nullable=False, primary_key=True)
36 36 resource_id = sa.Column(
37 37 sa.Integer(),
38 38 sa.ForeignKey(
39 39 "applications.resource_id", onupdate="CASCADE", ondelete="CASCADE"
40 40 ),
41 41 nullable=False,
42 42 index=True,
43 43 )
44 44 log_level = sa.Column(sa.Unicode, nullable=False, index=True, default="INFO")
45 45 message = sa.Column(sa.UnicodeText(), default="")
46 46 timestamp = sa.Column(
47 47 sa.DateTime(), default=datetime.utcnow, server_default=sa.func.now()
48 48 )
49 49 request_id = sa.Column(sa.Unicode())
50 50 namespace = sa.Column(sa.Unicode())
51 51 primary_key = sa.Column(sa.Unicode())
52 52
53 53 tags = sa.Column(JSON(), default={})
54 54 permanent = sa.Column(sa.Boolean(), nullable=False, default=False)
55 55
56 56 def __str__(self):
57 57 return self.__unicode__().encode("utf8")
58 58
59 59 def __unicode__(self):
60 60 return "<Log id:%s, lv:%s, ns:%s >" % (
61 61 self.log_id,
62 62 self.log_level,
63 63 self.namespace,
64 64 )
65 65
66 66 def set_data(self, data, resource):
67 67 level = data.get("log_level").upper()
68 68 self.log_level = getattr(LogLevel, level, LogLevel.UNKNOWN)
69 69 self.message = data.get("message", "")
70 70 server_name = data.get("server", "").lower() or "unknown"
71 71 self.tags = {"server_name": server_name}
72 72 if data.get("tags"):
73 73 for tag_tuple in data["tags"]:
74 74 self.tags[tag_tuple[0]] = tag_tuple[1]
75 75 self.timestamp = data["date"]
76 76 r_id = data.get("request_id", "")
77 77 if not r_id:
78 78 r_id = ""
79 79 self.request_id = r_id.replace("-", "")
80 80 self.resource_id = resource.resource_id
81 81 self.namespace = data.get("namespace") or ""
82 82 self.permanent = data.get("permanent")
83 83 self.primary_key = data.get("primary_key")
84 84 if self.primary_key is not None:
85 85 self.tags["appenlight_primary_key"] = self.primary_key
86 86
87 87 def get_dict(self):
88 88 instance_dict = super(Log, self).get_dict()
89 89 instance_dict["log_level"] = LogLevel.key_from_value(self.log_level)
90 90 instance_dict["resource_name"] = self.application.resource_name
91 91 return instance_dict
92 92
93 93 @property
94 94 def delete_hash(self):
95 95 if not self.primary_key:
96 96 return None
97 97
98 98 to_hash = "{}_{}_{}".format(self.resource_id, self.primary_key, self.namespace)
99 99 return hashlib.sha1(to_hash.encode("utf8")).hexdigest()
100 100
101 101 def es_doc(self):
102 102 tags = {}
103 103 tag_list = []
104 104 for name, value in self.tags.items():
105 105 # replace dot in indexed tag name
106 106 name = name.replace(".", "_")
107 107 tag_list.append(name)
108 108 tags[name] = {
109 109 "values": convert_es_type(value),
110 110 "numeric_values": value
111 111 if (isinstance(value, (int, float)) and not isinstance(value, bool))
112 112 else None,
113 113 }
114 114 return {
115 "pg_id": str(self.log_id),
115 "log_id": str(self.log_id),
116 116 "delete_hash": self.delete_hash,
117 117 "resource_id": self.resource_id,
118 118 "request_id": self.request_id,
119 119 "log_level": LogLevel.key_from_value(self.log_level),
120 120 "timestamp": self.timestamp,
121 121 "message": self.message if self.message else "",
122 122 "namespace": self.namespace if self.namespace else "",
123 123 "tags": tags,
124 124 "tag_list": tag_list,
125 125 }
126 126
127 127 @property
128 128 def partition_id(self):
129 129 if self.permanent:
130 130 return "rcae_l_%s" % self.timestamp.strftime("%Y_%m")
131 131 else:
132 132 return "rcae_l_%s" % self.timestamp.strftime("%Y_%m_%d")
@@ -1,68 +1,69 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors
4 4 #
5 5 # Licensed under the Apache License, Version 2.0 (the "License");
6 6 # you may not use this file except in compliance with the License.
7 7 # You may obtain a copy of the License at
8 8 #
9 9 # http://www.apache.org/licenses/LICENSE-2.0
10 10 #
11 11 # Unless required by applicable law or agreed to in writing, software
12 12 # distributed under the License is distributed on an "AS IS" BASIS,
13 13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 14 # See the License for the specific language governing permissions and
15 15 # limitations under the License.
16 16
17 17 from datetime import datetime
18 18
19 19 import sqlalchemy as sa
20 20 from sqlalchemy.dialects.postgresql import JSON
21 21
22 22 from ziggurat_foundations.models.base import BaseModel
23 23 from appenlight.lib.utils import convert_es_type
24 24 from appenlight.models import Base
25 25
26 26
27 27 class Metric(Base, BaseModel):
28 28 __tablename__ = "metrics"
29 29 __table_args__ = {"implicit_returning": False}
30 30
31 31 pkey = sa.Column(sa.BigInteger(), primary_key=True)
32 32 resource_id = sa.Column(
33 33 sa.Integer(),
34 34 sa.ForeignKey("applications.resource_id"),
35 35 nullable=False,
36 36 primary_key=True,
37 37 )
38 38 timestamp = sa.Column(
39 39 sa.DateTime(), default=datetime.utcnow, server_default=sa.func.now()
40 40 )
41 41 tags = sa.Column(JSON(), default={})
42 42 namespace = sa.Column(sa.Unicode(255))
43 43
44 44 @property
45 45 def partition_id(self):
46 46 return "rcae_m_%s" % self.timestamp.strftime("%Y_%m_%d")
47 47
48 48 def es_doc(self):
49 49 tags = {}
50 50 tag_list = []
51 51 for name, value in self.tags.items():
52 52 # replace dot in indexed tag name
53 53 name = name.replace(".", "_")
54 54 tag_list.append(name)
55 55 tags[name] = {
56 56 "values": convert_es_type(value),
57 57 "numeric_values": value
58 58 if (isinstance(value, (int, float)) and not isinstance(value, bool))
59 59 else None,
60 60 }
61 61
62 62 return {
63 "metric_id": self.pkey,
63 64 "resource_id": self.resource_id,
64 65 "timestamp": self.timestamp,
65 66 "namespace": self.namespace,
66 67 "tags": tags,
67 68 "tag_list": tag_list,
68 69 }
@@ -1,529 +1,534 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors
4 4 #
5 5 # Licensed under the Apache License, Version 2.0 (the "License");
6 6 # you may not use this file except in compliance with the License.
7 7 # You may obtain a copy of the License at
8 8 #
9 9 # http://www.apache.org/licenses/LICENSE-2.0
10 10 #
11 11 # Unless required by applicable law or agreed to in writing, software
12 12 # distributed under the License is distributed on an "AS IS" BASIS,
13 13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 14 # See the License for the specific language governing permissions and
15 15 # limitations under the License.
16 16
17 17 from datetime import datetime, timedelta
18 18 import math
19 19 import uuid
20 20 import hashlib
21 21 import copy
22 22 import urllib.parse
23 23 import logging
24 24 import sqlalchemy as sa
25 25
26 26 from appenlight.models import Base, Datastores
27 27 from appenlight.lib.utils.date_utils import convert_date
28 28 from appenlight.lib.utils import convert_es_type
29 29 from appenlight.models.slow_call import SlowCall
30 30 from appenlight.lib.utils import channelstream_request
31 31 from appenlight.lib.enums import ReportType, Language
32 32 from pyramid.threadlocal import get_current_registry, get_current_request
33 33 from sqlalchemy.dialects.postgresql import JSON
34 34 from ziggurat_foundations.models.base import BaseModel
35 35
36 36 log = logging.getLogger(__name__)
37 37
38 38 REPORT_TYPE_MATRIX = {
39 39 "http_status": {"type": "int", "ops": ("eq", "ne", "ge", "le")},
40 40 "group:priority": {"type": "int", "ops": ("eq", "ne", "ge", "le")},
41 41 "duration": {"type": "float", "ops": ("ge", "le")},
42 42 "url_domain": {
43 43 "type": "unicode",
44 44 "ops": ("eq", "ne", "startswith", "endswith", "contains"),
45 45 },
46 46 "url_path": {
47 47 "type": "unicode",
48 48 "ops": ("eq", "ne", "startswith", "endswith", "contains"),
49 49 },
50 50 "error": {
51 51 "type": "unicode",
52 52 "ops": ("eq", "ne", "startswith", "endswith", "contains"),
53 53 },
54 54 "tags:server_name": {
55 55 "type": "unicode",
56 56 "ops": ("eq", "ne", "startswith", "endswith", "contains"),
57 57 },
58 58 "traceback": {"type": "unicode", "ops": ("contains",)},
59 59 "group:occurences": {"type": "int", "ops": ("eq", "ne", "ge", "le")},
60 60 }
61 61
62 62
63 63 class Report(Base, BaseModel):
64 64 __tablename__ = "reports"
65 65 __table_args__ = {"implicit_returning": False}
66 66
67 67 id = sa.Column(sa.Integer, nullable=False, primary_key=True)
68 68 group_id = sa.Column(
69 69 sa.BigInteger,
70 70 sa.ForeignKey("reports_groups.id", ondelete="cascade", onupdate="cascade"),
71 71 )
72 72 resource_id = sa.Column(sa.Integer(), nullable=False, index=True)
73 73 report_type = sa.Column(sa.Integer(), nullable=False, index=True)
74 74 error = sa.Column(sa.UnicodeText(), index=True)
75 75 extra = sa.Column(JSON(), default={})
76 76 request = sa.Column(JSON(), nullable=False, default={})
77 77 ip = sa.Column(sa.String(39), index=True, default="")
78 78 username = sa.Column(sa.Unicode(255), default="")
79 79 user_agent = sa.Column(sa.Unicode(255), default="")
80 80 url = sa.Column(sa.UnicodeText(), index=True)
81 81 request_id = sa.Column(sa.Text())
82 82 request_stats = sa.Column(JSON(), nullable=False, default={})
83 83 traceback = sa.Column(JSON(), nullable=False, default=None)
84 84 traceback_hash = sa.Column(sa.Text())
85 85 start_time = sa.Column(
86 86 sa.DateTime(), default=datetime.utcnow, server_default=sa.func.now()
87 87 )
88 88 end_time = sa.Column(sa.DateTime())
89 89 duration = sa.Column(sa.Float, default=0)
90 90 http_status = sa.Column(sa.Integer, index=True)
91 91 url_domain = sa.Column(sa.Unicode(100), index=True)
92 92 url_path = sa.Column(sa.Unicode(255), index=True)
93 93 tags = sa.Column(JSON(), nullable=False, default={})
94 94 language = sa.Column(sa.Integer(), default=0)
95 95 # this is used to determine partition for the report
96 96 report_group_time = sa.Column(
97 97 sa.DateTime(), default=datetime.utcnow, server_default=sa.func.now()
98 98 )
99 99
100 100 logs = sa.orm.relationship(
101 101 "Log",
102 102 lazy="dynamic",
103 103 passive_deletes=True,
104 104 passive_updates=True,
105 105 primaryjoin="and_(Report.request_id==Log.request_id, "
106 106 "Log.request_id != None, Log.request_id != '')",
107 107 foreign_keys="[Log.request_id]",
108 108 )
109 109
110 110 slow_calls = sa.orm.relationship(
111 111 "SlowCall",
112 112 backref="detail",
113 113 cascade="all, delete-orphan",
114 114 passive_deletes=True,
115 115 passive_updates=True,
116 116 order_by="SlowCall.timestamp",
117 117 )
118 118
119 119 def set_data(self, data, resource, protocol_version=None):
120 120 self.http_status = data["http_status"]
121 121 self.priority = data["priority"]
122 122 self.error = data["error"]
123 123 report_language = data.get("language", "").lower()
124 124 self.language = getattr(Language, report_language, Language.unknown)
125 125 # we need temp holder here to decide later
126 126 # if we want to to commit the tags if report is marked for creation
127 127 self.tags = {"server_name": data["server"], "view_name": data["view_name"]}
128 128 if data.get("tags"):
129 129 for tag_tuple in data["tags"]:
130 130 self.tags[tag_tuple[0]] = tag_tuple[1]
131 131 self.traceback = data["traceback"]
132 132 stripped_traceback = self.stripped_traceback()
133 133 tb_repr = repr(stripped_traceback).encode("utf8")
134 134 self.traceback_hash = hashlib.sha1(tb_repr).hexdigest()
135 135 url_info = urllib.parse.urlsplit(data.get("url", ""), allow_fragments=False)
136 136 self.url_domain = url_info.netloc[:128]
137 137 self.url_path = url_info.path[:2048]
138 138 self.occurences = data["occurences"]
139 139 if self.error:
140 140 self.report_type = ReportType.error
141 141 else:
142 142 self.report_type = ReportType.slow
143 143
144 144 # but if its status 404 its 404 type
145 145 if self.http_status in [404, "404"] or self.error == "404 Not Found":
146 146 self.report_type = ReportType.not_found
147 147 self.error = ""
148 148
149 149 self.generate_grouping_hash(
150 150 data.get("appenlight.group_string", data.get("group_string")),
151 151 resource.default_grouping,
152 152 protocol_version,
153 153 )
154 154
155 155 # details
156 156 if data["http_status"] in [404, "404"]:
157 157 data = {
158 158 "username": data["username"],
159 159 "ip": data["ip"],
160 160 "url": data["url"],
161 161 "user_agent": data["user_agent"],
162 162 }
163 163 if data.get("HTTP_REFERER") or data.get("http_referer"):
164 164 data["HTTP_REFERER"] = data.get("HTTP_REFERER", "") or data.get(
165 165 "http_referer", ""
166 166 )
167 167
168 168 self.resource_id = resource.resource_id
169 169 self.username = data["username"]
170 170 self.user_agent = data["user_agent"]
171 171 self.ip = data["ip"]
172 172 self.extra = {}
173 173 if data.get("extra"):
174 174 for extra_tuple in data["extra"]:
175 175 self.extra[extra_tuple[0]] = extra_tuple[1]
176 176
177 177 self.url = data["url"]
178 178 self.request_id = data.get("request_id", "").replace("-", "") or str(
179 179 uuid.uuid4()
180 180 )
181 181 request_data = data.get("request", {})
182 182
183 183 self.request = request_data
184 184 self.request_stats = data.get("request_stats") or {}
185 185 traceback = data.get("traceback")
186 186 if not traceback:
187 187 traceback = data.get("frameinfo")
188 188 self.traceback = traceback
189 189 start_date = convert_date(data.get("start_time"))
190 190 if not self.start_time or self.start_time < start_date:
191 191 self.start_time = start_date
192 192
193 193 self.end_time = convert_date(data.get("end_time"), False)
194 194 self.duration = 0
195 195
196 196 if self.start_time and self.end_time:
197 197 d = self.end_time - self.start_time
198 198 self.duration = d.total_seconds()
199 199
200 200 # update tags with other vars
201 201 if self.username:
202 202 self.tags["user_name"] = self.username
203 203 self.tags["report_language"] = Language.key_from_value(self.language)
204 204
205 205 def add_slow_calls(self, data, report_group):
206 206 slow_calls = []
207 207 for call in data.get("slow_calls", []):
208 208 sc_inst = SlowCall()
209 209 sc_inst.set_data(
210 210 call, resource_id=self.resource_id, report_group=report_group
211 211 )
212 212 slow_calls.append(sc_inst)
213 213 self.slow_calls.extend(slow_calls)
214 214 return slow_calls
215 215
216 216 def get_dict(self, request, details=False, exclude_keys=None, include_keys=None):
217 217 from appenlight.models.services.report_group import ReportGroupService
218 218
219 219 instance_dict = super(Report, self).get_dict()
220 220 instance_dict["req_stats"] = self.req_stats()
221 221 instance_dict["group"] = {}
222 222 instance_dict["group"]["id"] = self.report_group.id
223 223 instance_dict["group"]["total_reports"] = self.report_group.total_reports
224 224 instance_dict["group"]["last_report"] = self.report_group.last_report
225 225 instance_dict["group"]["priority"] = self.report_group.priority
226 226 instance_dict["group"]["occurences"] = self.report_group.occurences
227 227 instance_dict["group"]["last_timestamp"] = self.report_group.last_timestamp
228 228 instance_dict["group"]["first_timestamp"] = self.report_group.first_timestamp
229 229 instance_dict["group"]["public"] = self.report_group.public
230 230 instance_dict["group"]["fixed"] = self.report_group.fixed
231 231 instance_dict["group"]["read"] = self.report_group.read
232 232 instance_dict["group"]["average_duration"] = self.report_group.average_duration
233 233
234 234 instance_dict["resource_name"] = self.report_group.application.resource_name
235 235 instance_dict["report_type"] = self.report_type
236 236
237 237 if instance_dict["http_status"] == 404 and not instance_dict["error"]:
238 238 instance_dict["error"] = "404 Not Found"
239 239
240 240 if details:
241 241 instance_dict[
242 242 "affected_users_count"
243 243 ] = ReportGroupService.affected_users_count(self.report_group)
244 244 instance_dict["top_affected_users"] = [
245 245 {"username": u.username, "count": u.count}
246 246 for u in ReportGroupService.top_affected_users(self.report_group)
247 247 ]
248 248 instance_dict["application"] = {"integrations": []}
249 249 for integration in self.report_group.application.integrations:
250 250 if integration.front_visible:
251 251 instance_dict["application"]["integrations"].append(
252 252 {
253 253 "name": integration.integration_name,
254 254 "action": integration.integration_action,
255 255 }
256 256 )
257 257 instance_dict["comments"] = [
258 258 c.get_dict() for c in self.report_group.comments
259 259 ]
260 260
261 261 instance_dict["group"]["next_report"] = None
262 262 instance_dict["group"]["previous_report"] = None
263 263 next_in_group = self.get_next_in_group(request)
264 264 previous_in_group = self.get_previous_in_group(request)
265 265 if next_in_group:
266 266 instance_dict["group"]["next_report"] = next_in_group
267 267 if previous_in_group:
268 268 instance_dict["group"]["previous_report"] = previous_in_group
269 269
270 270 # slow call ordering
271 271 def find_parent(row, data):
272 272 for r in reversed(data):
273 273 try:
274 274 if (
275 275 row["timestamp"] > r["timestamp"]
276 276 and row["end_time"] < r["end_time"]
277 277 ):
278 278 return r
279 279 except TypeError as e:
280 280 log.warning("reports_view.find_parent: %s" % e)
281 281 return None
282 282
283 283 new_calls = []
284 284 calls = [c.get_dict() for c in self.slow_calls]
285 285 while calls:
286 286 # start from end
287 287 for x in range(len(calls) - 1, -1, -1):
288 288 parent = find_parent(calls[x], calls)
289 289 if parent:
290 290 parent["children"].append(calls[x])
291 291 else:
292 292 # no parent at all? append to new calls anyways
293 293 new_calls.append(calls[x])
294 294 # print 'append', calls[x]
295 295 del calls[x]
296 296 break
297 297 instance_dict["slow_calls"] = new_calls
298 298
299 299 instance_dict["front_url"] = self.get_public_url(request)
300 300
301 301 exclude_keys_list = exclude_keys or []
302 302 include_keys_list = include_keys or []
303 303 for k in list(instance_dict.keys()):
304 304 if k == "group":
305 305 continue
306 306 if k in exclude_keys_list or (k not in include_keys_list and include_keys):
307 307 del instance_dict[k]
308 308 return instance_dict
309 309
310 310 def get_previous_in_group(self, request):
311 311 query = {
312 312 "size": 1,
313 313 "query": {
314 314 "bool": {
315 315 "filter": [
316 316 {"term": {"group_id": self.group_id}},
317 {"range": {"pg_id": {"lt": self.id}}},
317 {"range": {"report_id": {"lt": self.id}}},
318 318 ]
319 319 }
320 320 },
321 321 "sort": [{"_doc": {"order": "desc"}}],
322 322 }
323 323 result = request.es_conn.search(
324 324 body=query, index=self.partition_id, doc_type="report"
325 325 )
326 326 if result["hits"]["total"]:
327 return result["hits"]["hits"][0]["_source"]["pg_id"]
327 return result["hits"]["hits"][0]["_source"]["report_id"]
328 328
329 329 def get_next_in_group(self, request):
330 330 query = {
331 331 "size": 1,
332 332 "query": {
333 333 "bool": {
334 334 "filter": [
335 335 {"term": {"group_id": self.group_id}},
336 {"range": {"pg_id": {"gt": self.id}}},
336 {"range": {"report_id": {"gt": self.id}}},
337 337 ]
338 338 }
339 339 },
340 340 "sort": [{"_doc": {"order": "asc"}}],
341 341 }
342 342 result = request.es_conn.search(
343 343 body=query, index=self.partition_id, doc_type="report"
344 344 )
345 345 if result["hits"]["total"]:
346 return result["hits"]["hits"][0]["_source"]["pg_id"]
346 return result["hits"]["hits"][0]["_source"]["report_id"]
347 347
348 348 def get_public_url(self, request=None, report_group=None, _app_url=None):
349 349 """
350 350 Returns url that user can use to visit specific report
351 351 """
352 352 if not request:
353 353 request = get_current_request()
354 354 url = request.route_url("/", _app_url=_app_url)
355 355 if report_group:
356 356 return (url + "ui/report/%s/%s") % (report_group.id, self.id)
357 357 return (url + "ui/report/%s/%s") % (self.group_id, self.id)
358 358
359 359 def req_stats(self):
360 360 stats = self.request_stats.copy()
361 361 stats["percentages"] = {}
362 362 stats["percentages"]["main"] = 100.0
363 363 main = stats.get("main", 0.0)
364 364 if not main:
365 365 return None
366 366 for name, call_time in stats.items():
367 367 if "calls" not in name and "main" not in name and "percentages" not in name:
368 368 stats["main"] -= call_time
369 369 stats["percentages"][name] = math.floor((call_time / main * 100.0))
370 370 stats["percentages"]["main"] -= stats["percentages"][name]
371 371 if stats["percentages"]["main"] < 0.0:
372 372 stats["percentages"]["main"] = 0.0
373 373 stats["main"] = 0.0
374 374 return stats
375 375
376 376 def generate_grouping_hash(
377 377 self, hash_string=None, default_grouping=None, protocol_version=None
378 378 ):
379 379 """
380 380 Generates SHA1 hash that will be used to group reports together
381 381 """
382 382 if not hash_string:
383 383 location = self.tags.get("view_name") or self.url_path
384 384 server_name = self.tags.get("server_name") or ""
385 385 if default_grouping == "url_traceback":
386 386 hash_string = "%s_%s_%s" % (self.traceback_hash, location, self.error)
387 387 if self.language == Language.javascript:
388 388 hash_string = "%s_%s" % (self.traceback_hash, self.error)
389 389
390 390 elif default_grouping == "traceback_server":
391 391 hash_string = "%s_%s" % (self.traceback_hash, server_name)
392 392 if self.language == Language.javascript:
393 393 hash_string = "%s_%s" % (self.traceback_hash, server_name)
394 394 else:
395 395 hash_string = "%s_%s" % (self.error, location)
396 396 month = datetime.utcnow().date().replace(day=1)
397 397 hash_string = "{}_{}".format(month, hash_string)
398 398 binary_string = hash_string.encode("utf8")
399 399 self.grouping_hash = hashlib.sha1(binary_string).hexdigest()
400 400 return self.grouping_hash
401 401
402 402 def stripped_traceback(self):
403 403 """
404 404 Traceback without local vars
405 405 """
406 406 stripped_traceback = copy.deepcopy(self.traceback)
407 407
408 408 if isinstance(stripped_traceback, list):
409 409 for row in stripped_traceback:
410 410 row.pop("vars", None)
411 411 return stripped_traceback
412 412
413 413 def notify_channel(self, report_group):
414 414 """
415 415 Sends notification to websocket channel
416 416 """
417 417 settings = get_current_registry().settings
418 418 log.info("notify channelstream")
419 419 if self.report_type != ReportType.error:
420 420 return
421 421 payload = {
422 422 "type": "message",
423 423 "user": "__system__",
424 424 "channel": "app_%s" % self.resource_id,
425 425 "message": {
426 426 "topic": "front_dashboard.new_topic",
427 427 "report": {
428 428 "group": {
429 429 "priority": report_group.priority,
430 430 "first_timestamp": report_group.first_timestamp,
431 431 "last_timestamp": report_group.last_timestamp,
432 432 "average_duration": report_group.average_duration,
433 433 "occurences": report_group.occurences,
434 434 },
435 435 "report_id": self.id,
436 436 "group_id": self.group_id,
437 437 "resource_id": self.resource_id,
438 438 "http_status": self.http_status,
439 439 "url_domain": self.url_domain,
440 440 "url_path": self.url_path,
441 441 "error": self.error or "",
442 442 "server": self.tags.get("server_name"),
443 443 "view_name": self.tags.get("view_name"),
444 444 "front_url": self.get_public_url(),
445 445 },
446 446 },
447 447 }
448 448 channelstream_request(
449 449 settings["cometd.secret"],
450 450 "/message",
451 451 [payload],
452 452 servers=[settings["cometd_servers"]],
453 453 )
454 454
455 455 def es_doc(self):
456 456 tags = {}
457 457 tag_list = []
458 458 for name, value in self.tags.items():
459 459 name = name.replace(".", "_")
460 460 tag_list.append(name)
461 461 tags[name] = {
462 462 "values": convert_es_type(value),
463 463 "numeric_values": value
464 464 if (isinstance(value, (int, float)) and not isinstance(value, bool))
465 465 else None,
466 466 }
467 467
468 468 if "user_name" not in self.tags and self.username:
469 469 tags["user_name"] = {"value": [self.username], "numeric_value": None}
470 470 return {
471 471 "_id": str(self.id),
472 "pg_id": str(self.id),
472 "report_id": str(self.id),
473 473 "resource_id": self.resource_id,
474 474 "http_status": self.http_status or "",
475 475 "start_time": self.start_time,
476 476 "end_time": self.end_time,
477 477 "url_domain": self.url_domain if self.url_domain else "",
478 478 "url_path": self.url_path if self.url_path else "",
479 479 "duration": self.duration,
480 480 "error": self.error if self.error else "",
481 481 "report_type": self.report_type,
482 482 "request_id": self.request_id,
483 483 "ip": self.ip,
484 484 "group_id": str(self.group_id),
485 "_parent": str(self.group_id),
485 "type": "report",
486 "join_field": {
487 "name": "report",
488 "parent": str(self.group_id)
489 },
486 490 "tags": tags,
487 491 "tag_list": tag_list,
492 "_routing": str(self.group_id)
488 493 }
489 494
490 495 @property
491 496 def partition_id(self):
492 497 return "rcae_r_%s" % self.report_group_time.strftime("%Y_%m")
493 498
494 499 def partition_range(self):
495 500 start_date = self.report_group_time.date().replace(day=1)
496 501 end_date = start_date + timedelta(days=40)
497 502 end_date = end_date.replace(day=1)
498 503 return start_date, end_date
499 504
500 505
501 506 def after_insert(mapper, connection, target):
502 507 if not hasattr(target, "_skip_ft_index"):
503 508 data = target.es_doc()
504 509 data.pop("_id", None)
505 510 Datastores.es.index(
506 511 target.partition_id, "report", data, parent=target.group_id, id=target.id
507 512 )
508 513
509 514
510 515 def after_update(mapper, connection, target):
511 516 if not hasattr(target, "_skip_ft_index"):
512 517 data = target.es_doc()
513 518 data.pop("_id", None)
514 519 Datastores.es.index(
515 520 target.partition_id, "report", data, parent=target.group_id, id=target.id
516 521 )
517 522
518 523
519 524 def after_delete(mapper, connection, target):
520 525 if not hasattr(target, "_skip_ft_index"):
521 query = {"query": {"term": {"pg_id": target.id}}}
526 query = {"query": {"term": {"report_id": target.id}}}
522 527 Datastores.es.delete_by_query(
523 528 index=target.partition_id, doc_type="report", body=query, conflicts="proceed"
524 529 )
525 530
526 531
527 532 sa.event.listen(Report, "after_insert", after_insert)
528 533 sa.event.listen(Report, "after_update", after_update)
529 534 sa.event.listen(Report, "after_delete", after_delete)
@@ -1,285 +1,285 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors
4 4 #
5 5 # Licensed under the Apache License, Version 2.0 (the "License");
6 6 # you may not use this file except in compliance with the License.
7 7 # You may obtain a copy of the License at
8 8 #
9 9 # http://www.apache.org/licenses/LICENSE-2.0
10 10 #
11 11 # Unless required by applicable law or agreed to in writing, software
12 12 # distributed under the License is distributed on an "AS IS" BASIS,
13 13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 14 # See the License for the specific language governing permissions and
15 15 # limitations under the License.
16 16
17 17 import logging
18 18 import sqlalchemy as sa
19 19
20 20 from datetime import datetime, timedelta
21 21
22 22 from pyramid.threadlocal import get_current_request
23 23 from sqlalchemy.dialects.postgresql import JSON
24 24 from ziggurat_foundations.models.base import BaseModel
25 25
26 26 from appenlight.models import Base, get_db_session, Datastores
27 27 from appenlight.lib.enums import ReportType
28 28 from appenlight.lib.rule import Rule
29 29 from appenlight.lib.redis_keys import REDIS_KEYS
30 30 from appenlight.models.report import REPORT_TYPE_MATRIX
31 31
32 32 log = logging.getLogger(__name__)
33 33
34 34
35 35 class ReportGroup(Base, BaseModel):
36 36 __tablename__ = "reports_groups"
37 37 __table_args__ = {"implicit_returning": False}
38 38
39 39 id = sa.Column(sa.BigInteger(), nullable=False, primary_key=True)
40 40 resource_id = sa.Column(
41 41 sa.Integer(),
42 42 sa.ForeignKey(
43 43 "applications.resource_id", onupdate="CASCADE", ondelete="CASCADE"
44 44 ),
45 45 nullable=False,
46 46 index=True,
47 47 )
48 48 priority = sa.Column(
49 49 sa.Integer, nullable=False, index=True, default=5, server_default="5"
50 50 )
51 51 first_timestamp = sa.Column(
52 52 sa.DateTime(), default=datetime.utcnow, server_default=sa.func.now()
53 53 )
54 54 last_timestamp = sa.Column(
55 55 sa.DateTime(), default=datetime.utcnow, server_default=sa.func.now()
56 56 )
57 57 error = sa.Column(sa.UnicodeText(), index=True)
58 58 grouping_hash = sa.Column(sa.String(40), default="")
59 59 triggered_postprocesses_ids = sa.Column(JSON(), nullable=False, default=list)
60 60 report_type = sa.Column(sa.Integer, default=1)
61 61 total_reports = sa.Column(sa.Integer, default=1)
62 62 last_report = sa.Column(sa.Integer)
63 63 occurences = sa.Column(sa.Integer, default=1)
64 64 average_duration = sa.Column(sa.Float, default=0)
65 65 summed_duration = sa.Column(sa.Float, default=0)
66 66 read = sa.Column(sa.Boolean(), index=True, default=False)
67 67 fixed = sa.Column(sa.Boolean(), index=True, default=False)
68 68 notified = sa.Column(sa.Boolean(), index=True, default=False)
69 69 public = sa.Column(sa.Boolean(), index=True, default=False)
70 70
71 71 reports = sa.orm.relationship(
72 72 "Report",
73 73 lazy="dynamic",
74 74 backref="report_group",
75 75 cascade="all, delete-orphan",
76 76 passive_deletes=True,
77 77 passive_updates=True,
78 78 )
79 79
80 80 comments = sa.orm.relationship(
81 81 "ReportComment",
82 82 lazy="dynamic",
83 83 backref="report",
84 84 cascade="all, delete-orphan",
85 85 passive_deletes=True,
86 86 passive_updates=True,
87 87 order_by="ReportComment.comment_id",
88 88 )
89 89
90 90 assigned_users = sa.orm.relationship(
91 91 "User",
92 92 backref=sa.orm.backref(
93 93 "assigned_reports_relation",
94 94 lazy="dynamic",
95 95 order_by=sa.desc(sa.text("reports_groups.id")),
96 96 ),
97 97 passive_deletes=True,
98 98 passive_updates=True,
99 99 secondary="reports_assignments",
100 100 order_by="User.user_name",
101 101 )
102 102
103 103 stats = sa.orm.relationship(
104 104 "ReportStat",
105 105 lazy="dynamic",
106 106 backref="report",
107 107 passive_deletes=True,
108 108 passive_updates=True,
109 109 )
110 110
111 111 last_report_ref = sa.orm.relationship(
112 112 "Report",
113 113 uselist=False,
114 114 primaryjoin="ReportGroup.last_report " "== Report.id",
115 115 foreign_keys="Report.id",
116 116 cascade="all, delete-orphan",
117 117 passive_deletes=True,
118 118 passive_updates=True,
119 119 )
120 120
121 121 def __repr__(self):
122 122 return "<ReportGroup id:{}>".format(self.id)
123 123
124 124 def get_report(self, report_id=None, public=False):
125 125 """
126 126 Gets report with specific id or latest report if id was not specified
127 127 """
128 128 from .report import Report
129 129
130 130 if not report_id:
131 131 return self.last_report_ref
132 132 else:
133 133 return self.reports.filter(Report.id == report_id).first()
134 134
135 135 def get_public_url(self, request, _app_url=None):
136 136 url = request.route_url("/", _app_url=_app_url)
137 137 return (url + "ui/report/%s") % self.id
138 138
139 139 def run_postprocessing(self, report):
140 140 """
141 141 Alters report group priority based on postprocessing configuration
142 142 """
143 143 request = get_current_request()
144 144 get_db_session(None, self).flush()
145 145 for action in self.application.postprocess_conf:
146 146 get_db_session(None, self).flush()
147 147 rule_obj = Rule(action.rule, REPORT_TYPE_MATRIX)
148 148 report_dict = report.get_dict(request)
149 149 # if was not processed yet
150 150 if (
151 151 rule_obj.match(report_dict)
152 152 and action.pkey not in self.triggered_postprocesses_ids
153 153 ):
154 154 action.postprocess(self)
155 155 # this way sqla can track mutation of list
156 156 self.triggered_postprocesses_ids = self.triggered_postprocesses_ids + [
157 157 action.pkey
158 158 ]
159 159
160 160 get_db_session(None, self).flush()
161 161 # do not go out of bounds
162 162 if self.priority < 1:
163 163 self.priority = 1
164 164 if self.priority > 10:
165 165 self.priority = 10
166 166
167 167 def get_dict(self, request):
168 168 instance_dict = super(ReportGroup, self).get_dict()
169 169 instance_dict["server_name"] = self.get_report().tags.get("server_name")
170 170 instance_dict["view_name"] = self.get_report().tags.get("view_name")
171 171 instance_dict["resource_name"] = self.application.resource_name
172 172 instance_dict["report_type"] = self.get_report().report_type
173 173 instance_dict["url_path"] = self.get_report().url_path
174 174 instance_dict["front_url"] = self.get_report().get_public_url(request)
175 175 del instance_dict["triggered_postprocesses_ids"]
176 176 return instance_dict
177 177
178 178 def es_doc(self):
179 179 return {
180 180 "_id": str(self.id),
181 "pg_id": str(self.id),
181 "group_id": str(self.id),
182 182 "resource_id": self.resource_id,
183 183 "error": self.error,
184 184 "fixed": self.fixed,
185 185 "public": self.public,
186 186 "read": self.read,
187 187 "priority": self.priority,
188 188 "occurences": self.occurences,
189 189 "average_duration": self.average_duration,
190 190 "summed_duration": self.summed_duration,
191 191 "first_timestamp": self.first_timestamp,
192 192 "last_timestamp": self.last_timestamp,
193 "type": "report_group",
194 "join_field": {
195 "name": "report_group"
196 },
193 197 }
194 198
195 199 def set_notification_info(self, notify_10=False, notify_100=False):
196 200 """
197 201 Update redis notification maps for notification job
198 202 """
199 203 current_time = datetime.utcnow().replace(second=0, microsecond=0)
200 204 # global app counter
201 205 key = REDIS_KEYS["counters"]["reports_per_type"].format(
202 206 self.report_type, current_time
203 207 )
204 208 redis_pipeline = Datastores.redis.pipeline()
205 209 redis_pipeline.incr(key)
206 210 redis_pipeline.expire(key, 3600 * 24)
207 211 # detailed app notification for alerts and notifications
208 212 redis_pipeline.sadd(REDIS_KEYS["apps_that_had_reports"], self.resource_id)
209 213 redis_pipeline.sadd(
210 214 REDIS_KEYS["apps_that_had_reports_alerting"], self.resource_id
211 215 )
212 216 # only notify for exceptions here
213 217 if self.report_type == ReportType.error:
214 218 redis_pipeline.sadd(REDIS_KEYS["apps_that_had_reports"], self.resource_id)
215 219 redis_pipeline.sadd(
216 220 REDIS_KEYS["apps_that_had_error_reports_alerting"], self.resource_id
217 221 )
218 222 key = REDIS_KEYS["counters"]["report_group_occurences"].format(self.id)
219 223 redis_pipeline.incr(key)
220 224 redis_pipeline.expire(key, 3600 * 24)
221 225 key = REDIS_KEYS["counters"]["report_group_occurences_alerting"].format(self.id)
222 226 redis_pipeline.incr(key)
223 227 redis_pipeline.expire(key, 3600 * 24)
224 228
225 229 if notify_10:
226 230 key = REDIS_KEYS["counters"]["report_group_occurences_10th"].format(self.id)
227 231 redis_pipeline.setex(key, 3600 * 24, 1)
228 232 if notify_100:
229 233 key = REDIS_KEYS["counters"]["report_group_occurences_100th"].format(
230 234 self.id
231 235 )
232 236 redis_pipeline.setex(key, 3600 * 24, 1)
233 237
234 238 key = REDIS_KEYS["reports_to_notify_per_type_per_app"].format(
235 239 self.report_type, self.resource_id
236 240 )
237 241 redis_pipeline.sadd(key, self.id)
238 242 redis_pipeline.expire(key, 3600 * 24)
239 243 key = REDIS_KEYS["reports_to_notify_per_type_per_app_alerting"].format(
240 244 self.report_type, self.resource_id
241 245 )
242 246 redis_pipeline.sadd(key, self.id)
243 247 redis_pipeline.expire(key, 3600 * 24)
244 248 redis_pipeline.execute()
245 249
246 250 @property
247 251 def partition_id(self):
248 252 return "rcae_r_%s" % self.first_timestamp.strftime("%Y_%m")
249 253
250 254 def partition_range(self):
251 255 start_date = self.first_timestamp.date().replace(day=1)
252 256 end_date = start_date + timedelta(days=40)
253 257 end_date = end_date.replace(day=1)
254 258 return start_date, end_date
255 259
256 260
257 261 def after_insert(mapper, connection, target):
258 262 if not hasattr(target, "_skip_ft_index"):
259 263 data = target.es_doc()
260 264 data.pop("_id", None)
261 Datastores.es.index(target.partition_id, "report_group", data, id=target.id)
265 Datastores.es.index(target.partition_id, "report", data, id=target.id)
262 266
263 267
264 268 def after_update(mapper, connection, target):
265 269 if not hasattr(target, "_skip_ft_index"):
266 270 data = target.es_doc()
267 271 data.pop("_id", None)
268 Datastores.es.index(target.partition_id, "report_group", data, id=target.id)
272 Datastores.es.index(target.partition_id, "report", data, id=target.id)
269 273
270 274
271 275 def after_delete(mapper, connection, target):
272 276 query = {"query": {"term": {"group_id": target.id}}}
273 277 # delete by query
274 278 Datastores.es.delete_by_query(
275 279 index=target.partition_id, doc_type="report", body=query, conflicts="proceed"
276 280 )
277 query = {"query": {"term": {"pg_id": target.id}}}
278 Datastores.es.delete_by_query(
279 index=target.partition_id, doc_type="report_group", body=query, conflicts="proceed"
280 )
281 281
282 282
283 283 sa.event.listen(ReportGroup, "after_insert", after_insert)
284 284 sa.event.listen(ReportGroup, "after_update", after_update)
285 285 sa.event.listen(ReportGroup, "after_delete", after_delete)
@@ -1,79 +1,81 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors
4 4 #
5 5 # Licensed under the Apache License, Version 2.0 (the "License");
6 6 # you may not use this file except in compliance with the License.
7 7 # You may obtain a copy of the License at
8 8 #
9 9 # http://www.apache.org/licenses/LICENSE-2.0
10 10 #
11 11 # Unless required by applicable law or agreed to in writing, software
12 12 # distributed under the License is distributed on an "AS IS" BASIS,
13 13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 14 # See the License for the specific language governing permissions and
15 15 # limitations under the License.
16 16
17 17 import sqlalchemy as sa
18 18
19 19 from appenlight.lib.enums import ReportType
20 20 from appenlight.models import Base
21 21 from ziggurat_foundations.models.base import BaseModel
22 22
23 23
24 24 class ReportStat(Base, BaseModel):
25 25 __tablename__ = "reports_stats"
26 26 __table_args__ = {"implicit_returning": False}
27 27
28 28 group_id = sa.Column(
29 29 sa.BigInteger(), sa.ForeignKey("reports_groups.id"), nullable=False
30 30 )
31 31 resource_id = sa.Column(
32 32 sa.Integer(), sa.ForeignKey("applications.resource_id"), nullable=False
33 33 )
34 34 start_interval = sa.Column(sa.DateTime(), nullable=False)
35 35 occurences = sa.Column(sa.Integer, nullable=True, default=0)
36 36 owner_user_id = sa.Column(sa.Integer(), sa.ForeignKey("users.id"), nullable=True)
37 37 type = sa.Column(sa.Integer, nullable=True, default=0)
38 38 duration = sa.Column(sa.Float, nullable=True, default=0)
39 39 id = sa.Column(sa.BigInteger, nullable=False, primary_key=True)
40 40 server_name = sa.Column(sa.Unicode(128), nullable=False, default="")
41 41 view_name = sa.Column(sa.Unicode(128), nullable=False, default="")
42 42
43 43 @property
44 44 def partition_id(self):
45 45 return "rcae_r_%s" % self.start_interval.strftime("%Y_%m")
46 46
47 47 def es_doc(self):
48 48 return {
49 49 "resource_id": self.resource_id,
50 50 "timestamp": self.start_interval,
51 "pg_id": str(self.id),
51 "report_stat_id": str(self.id),
52 52 "permanent": True,
53 53 "request_id": None,
54 54 "log_level": "ERROR",
55 55 "message": None,
56 56 "namespace": "appenlight.error",
57 "group_id": str(self.group_id),
57 58 "tags": {
58 59 "duration": {"values": self.duration, "numeric_values": self.duration},
59 60 "occurences": {
60 61 "values": self.occurences,
61 62 "numeric_values": self.occurences,
62 63 },
63 64 "group_id": {"values": self.group_id, "numeric_values": self.group_id},
64 65 "type": {
65 66 "values": ReportType.key_from_value(self.type),
66 67 "numeric_values": self.type,
67 68 },
68 69 "server_name": {"values": self.server_name, "numeric_values": None},
69 70 "view_name": {"values": self.view_name, "numeric_values": None},
70 71 },
71 72 "tag_list": [
72 73 "duration",
73 74 "occurences",
74 75 "group_id",
75 76 "type",
76 77 "server_name",
77 78 "view_name",
78 79 ],
80 "type": "report_stat",
79 81 }
@@ -1,222 +1,222 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors
4 4 #
5 5 # Licensed under the Apache License, Version 2.0 (the "License");
6 6 # you may not use this file except in compliance with the License.
7 7 # You may obtain a copy of the License at
8 8 #
9 9 # http://www.apache.org/licenses/LICENSE-2.0
10 10 #
11 11 # Unless required by applicable law or agreed to in writing, software
12 12 # distributed under the License is distributed on an "AS IS" BASIS,
13 13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 14 # See the License for the specific language governing permissions and
15 15 # limitations under the License.
16 16
17 17 import paginate
18 18 import logging
19 19 import sqlalchemy as sa
20 20
21 21 from appenlight.models.log import Log
22 22 from appenlight.models import get_db_session, Datastores
23 23 from appenlight.models.services.base import BaseService
24 24 from appenlight.lib.utils import es_index_name_limiter
25 25
26 26 log = logging.getLogger(__name__)
27 27
28 28
29 29 class LogService(BaseService):
30 30 @classmethod
31 31 def get_logs(cls, resource_ids=None, filter_settings=None, db_session=None):
32 32 # ensure we always have id's passed
33 33 if not resource_ids:
34 34 # raise Exception('No App ID passed')
35 35 return []
36 36 db_session = get_db_session(db_session)
37 37 q = db_session.query(Log)
38 38 q = q.filter(Log.resource_id.in_(resource_ids))
39 39 if filter_settings.get("start_date"):
40 40 q = q.filter(Log.timestamp >= filter_settings.get("start_date"))
41 41 if filter_settings.get("end_date"):
42 42 q = q.filter(Log.timestamp <= filter_settings.get("end_date"))
43 43 if filter_settings.get("log_level"):
44 44 q = q.filter(Log.log_level == filter_settings.get("log_level").upper())
45 45 if filter_settings.get("request_id"):
46 46 request_id = filter_settings.get("request_id", "")
47 47 q = q.filter(Log.request_id == request_id.replace("-", ""))
48 48 if filter_settings.get("namespace"):
49 49 q = q.filter(Log.namespace == filter_settings.get("namespace"))
50 50 q = q.order_by(sa.desc(Log.timestamp))
51 51 return q
52 52
53 53 @classmethod
54 54 def es_query_builder(cls, app_ids, filter_settings):
55 55 if not filter_settings:
56 56 filter_settings = {}
57 57
58 58 query = {
59 59 "query": {
60 60 "bool": {
61 61 "filter": [{"terms": {"resource_id": list(app_ids)}}]
62 62 }
63 63 }
64 64 }
65 65
66 66 start_date = filter_settings.get("start_date")
67 67 end_date = filter_settings.get("end_date")
68 68 filter_part = query["query"]["bool"]["filter"]
69 69
70 70 for tag in filter_settings.get("tags", []):
71 71 tag_values = [v.lower() for v in tag["value"]]
72 72 key = "tags.%s.values" % tag["name"].replace(".", "_")
73 73 filter_part.append({"terms": {key: tag_values}})
74 74
75 75 date_range = {"range": {"timestamp": {}}}
76 76 if start_date:
77 77 date_range["range"]["timestamp"]["gte"] = start_date
78 78 if end_date:
79 79 date_range["range"]["timestamp"]["lte"] = end_date
80 80 if start_date or end_date:
81 81 filter_part.append(date_range)
82 82
83 83 levels = filter_settings.get("level")
84 84 if levels:
85 85 filter_part.append({"terms": {"log_level": levels}})
86 86 namespaces = filter_settings.get("namespace")
87 87 if namespaces:
88 88 filter_part.append({"terms": {"namespace": namespaces}})
89 89
90 90 request_ids = filter_settings.get("request_id")
91 91 if request_ids:
92 92 filter_part.append({"terms": {"request_id": request_ids}})
93 93
94 94 messages = filter_settings.get("message")
95 95 if messages:
96 96 query["query"]["bool"]["must"] = {
97 97 "match": {"message": {"query": " ".join(messages), "operator": "and"}}
98 98 }
99 99 return query
100 100
101 101 @classmethod
102 102 def get_time_series_aggregate(cls, app_ids=None, filter_settings=None):
103 103 if not app_ids:
104 104 return {}
105 105 es_query = cls.es_query_builder(app_ids, filter_settings)
106 106 es_query["aggs"] = {
107 107 "events_over_time": {
108 108 "date_histogram": {
109 109 "field": "timestamp",
110 110 "interval": "1h",
111 111 "min_doc_count": 0,
112 112 "extended_bounds": {
113 113 "max": filter_settings.get("end_date"),
114 114 "min": filter_settings.get("start_date"),
115 115 },
116 116 }
117 117 }
118 118 }
119 119 log.debug(es_query)
120 120 index_names = es_index_name_limiter(
121 121 filter_settings.get("start_date"),
122 122 filter_settings.get("end_date"),
123 123 ixtypes=["logs"],
124 124 )
125 125 if index_names:
126 126 results = Datastores.es.search(
127 127 body=es_query, index=index_names, doc_type="log", size=0
128 128 )
129 129 else:
130 130 results = []
131 131 return results
132 132
133 133 @classmethod
134 134 def get_search_iterator(
135 135 cls,
136 136 app_ids=None,
137 137 page=1,
138 138 items_per_page=50,
139 139 order_by=None,
140 140 filter_settings=None,
141 141 limit=None,
142 142 ):
143 143 if not app_ids:
144 144 return {}, 0
145 145
146 146 es_query = cls.es_query_builder(app_ids, filter_settings)
147 147 sort_query = {"sort": [{"timestamp": {"order": "desc"}}]}
148 148 es_query.update(sort_query)
149 149 log.debug(es_query)
150 150 es_from = (page - 1) * items_per_page
151 151 index_names = es_index_name_limiter(
152 152 filter_settings.get("start_date"),
153 153 filter_settings.get("end_date"),
154 154 ixtypes=["logs"],
155 155 )
156 156 if not index_names:
157 157 return {}, 0
158 158
159 159 results = Datastores.es.search(
160 160 body=es_query,
161 161 index=index_names,
162 162 doc_type="log",
163 163 size=items_per_page,
164 164 from_=es_from,
165 165 )
166 166 if results["hits"]["total"] > 5000:
167 167 count = 5000
168 168 else:
169 169 count = results["hits"]["total"]
170 170 return results["hits"], count
171 171
172 172 @classmethod
173 173 def get_paginator_by_app_ids(
174 174 cls,
175 175 app_ids=None,
176 176 page=1,
177 177 item_count=None,
178 178 items_per_page=50,
179 179 order_by=None,
180 180 filter_settings=None,
181 181 exclude_columns=None,
182 182 db_session=None,
183 183 ):
184 184 if not filter_settings:
185 185 filter_settings = {}
186 186 results, item_count = cls.get_search_iterator(
187 187 app_ids, page, items_per_page, order_by, filter_settings
188 188 )
189 189 paginator = paginate.Page(
190 190 [], item_count=item_count, items_per_page=items_per_page, **filter_settings
191 191 )
192 192 ordered_ids = tuple(
193 item["_source"]["pg_id"] for item in results.get("hits", [])
193 item["_source"]["log_id"] for item in results.get("hits", [])
194 194 )
195 195
196 196 sorted_instance_list = []
197 197 if ordered_ids:
198 198 db_session = get_db_session(db_session)
199 199 query = db_session.query(Log)
200 200 query = query.filter(Log.log_id.in_(ordered_ids))
201 201 query = query.order_by(sa.desc("timestamp"))
202 202 sa_items = query.all()
203 203 # resort by score
204 204 for i_id in ordered_ids:
205 205 for item in sa_items:
206 206 if str(item.log_id) == str(i_id):
207 207 sorted_instance_list.append(item)
208 208 paginator.sa_items = sorted_instance_list
209 209 return paginator
210 210
211 211 @classmethod
212 212 def query_by_primary_key_and_namespace(cls, list_of_pairs, db_session=None):
213 213 db_session = get_db_session(db_session)
214 214 list_of_conditions = []
215 215 query = db_session.query(Log)
216 216 for pair in list_of_pairs:
217 217 list_of_conditions.append(
218 218 sa.and_(Log.primary_key == pair["pk"], Log.namespace == pair["ns"])
219 219 )
220 220 query = query.filter(sa.or_(*list_of_conditions))
221 221 query = query.order_by(sa.asc(Log.timestamp), sa.asc(Log.log_id))
222 222 return query
@@ -1,521 +1,521 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors
4 4 #
5 5 # Licensed under the Apache License, Version 2.0 (the "License");
6 6 # you may not use this file except in compliance with the License.
7 7 # You may obtain a copy of the License at
8 8 #
9 9 # http://www.apache.org/licenses/LICENSE-2.0
10 10 #
11 11 # Unless required by applicable law or agreed to in writing, software
12 12 # distributed under the License is distributed on an "AS IS" BASIS,
13 13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 14 # See the License for the specific language governing permissions and
15 15 # limitations under the License.
16 16
17 17 import logging
18 18 import paginate
19 19 import sqlalchemy as sa
20 20 import appenlight.lib.helpers as h
21 21
22 22 from datetime import datetime
23 23
24 24 from appenlight.models import get_db_session, Datastores
25 25 from appenlight.models.report import Report
26 26 from appenlight.models.report_group import ReportGroup
27 27 from appenlight.models.report_comment import ReportComment
28 28 from appenlight.models.user import User
29 29 from appenlight.models.services.base import BaseService
30 30 from appenlight.lib.enums import ReportType
31 31 from appenlight.lib.utils import es_index_name_limiter
32 32
33 33 log = logging.getLogger(__name__)
34 34
35 35
36 36 class ReportGroupService(BaseService):
37 37 @classmethod
38 38 def get_trending(cls, request, filter_settings, limit=15, db_session=None):
39 39 """
40 40 Returns report groups trending for specific time interval
41 41 """
42 42 db_session = get_db_session(db_session)
43 43
44 44 tags = []
45 45 if filter_settings.get("tags"):
46 46 for tag in filter_settings["tags"]:
47 47 tags.append(
48 48 {"terms": {"tags.{}.values".format(tag["name"]): tag["value"]}}
49 49 )
50 50
51 51 index_names = es_index_name_limiter(
52 52 start_date=filter_settings["start_date"],
53 53 end_date=filter_settings["end_date"],
54 54 ixtypes=["reports"],
55 55 )
56 56
57 57 if not index_names or not filter_settings["resource"]:
58 58 return []
59 59
60 60 es_query = {
61 61 "aggs": {
62 62 "parent_agg": {
63 63 "aggs": {
64 64 "groups": {
65 65 "aggs": {
66 66 "sub_agg": {
67 67 "value_count": {"field": "tags.group_id.values.keyword"}
68 68 }
69 69 },
70 70 "filter": {"exists": {"field": "tags.group_id.values"}},
71 71 }
72 72 },
73 73 "terms": {"field": "tags.group_id.values.keyword", "size": limit},
74 74 }
75 75 },
76 76 "query": {
77 77 "bool": {
78 78 "filter": [
79 79 {
80 80 "terms": {
81 81 "resource_id": [filter_settings["resource"][0]]
82 82 }
83 83 },
84 84 {
85 85 "range": {
86 86 "timestamp": {
87 87 "gte": filter_settings["start_date"],
88 88 "lte": filter_settings["end_date"],
89 89 }
90 90 }
91 91 },
92 92 ]
93 93 }
94 94 },
95 95 }
96 96 if tags:
97 97 es_query["query"]["bool"]["filter"].extend(tags)
98 98
99 99 result = Datastores.es.search(
100 body=es_query, index=index_names, doc_type="log", size=0
100 body=es_query, index=index_names, doc_type="report", size=0
101 101 )
102 102 series = []
103 103 for bucket in result["aggregations"]["parent_agg"]["buckets"]:
104 104 series.append(
105 105 {"key": bucket["key"], "groups": bucket["groups"]["sub_agg"]["value"]}
106 106 )
107 107
108 108 report_groups_d = {}
109 109 for g in series:
110 110 report_groups_d[int(g["key"])] = g["groups"] or 0
111 111
112 112 query = db_session.query(ReportGroup)
113 113 query = query.filter(ReportGroup.id.in_(list(report_groups_d.keys())))
114 114 query = query.options(sa.orm.joinedload(ReportGroup.last_report_ref))
115 115 results = [(report_groups_d[group.id], group) for group in query]
116 116 return sorted(results, reverse=True, key=lambda x: x[0])
117 117
118 118 @classmethod
119 119 def get_search_iterator(
120 120 cls,
121 121 app_ids=None,
122 122 page=1,
123 123 items_per_page=50,
124 124 order_by=None,
125 125 filter_settings=None,
126 126 limit=None,
127 127 ):
128 128 if not app_ids:
129 129 return {}
130 130 if not filter_settings:
131 131 filter_settings = {}
132 132
133 133 query = {
134 134 "size": 0,
135 135 "query": {
136 136 "bool": {
137 137 "must": [],
138 138 "should": [],
139 139 "filter": [{"terms": {"resource_id": list(app_ids)}}]
140 140 }
141 141 },
142 142 "aggs": {
143 143 "top_groups": {
144 144 "terms": {
145 145 "size": 5000,
146 "field": "_parent#report_group",
146 "field": "join_field#report_group",
147 147 "order": {"newest": "desc"},
148 148 },
149 149 "aggs": {
150 150 "top_reports_hits": {
151 151 "top_hits": {"size": 1, "sort": {"start_time": "desc"}}
152 152 },
153 153 "newest": {"max": {"field": "start_time"}},
154 154 },
155 155 }
156 156 },
157 157 }
158 158
159 159 start_date = filter_settings.get("start_date")
160 160 end_date = filter_settings.get("end_date")
161 161 filter_part = query["query"]["bool"]["filter"]
162 162 date_range = {"range": {"start_time": {}}}
163 163 if start_date:
164 164 date_range["range"]["start_time"]["gte"] = start_date
165 165 if end_date:
166 166 date_range["range"]["start_time"]["lte"] = end_date
167 167 if start_date or end_date:
168 168 filter_part.append(date_range)
169 169
170 170 priorities = filter_settings.get("priority")
171 171
172 172 for tag in filter_settings.get("tags", []):
173 173 tag_values = [v.lower() for v in tag["value"]]
174 174 key = "tags.%s.values" % tag["name"].replace(".", "_")
175 175 filter_part.append({"terms": {key: tag_values}})
176 176
177 177 if priorities:
178 178 filter_part.append(
179 179 {
180 180 "has_parent": {
181 181 "parent_type": "report_group",
182 182 "query": {"terms": {"priority": priorities}},
183 183 }
184 184 }
185 185 )
186 186
187 187 min_occurences = filter_settings.get("min_occurences")
188 188 if min_occurences:
189 189 filter_part.append(
190 190 {
191 191 "has_parent": {
192 192 "parent_type": "report_group",
193 193 "query": {"range": {"occurences": {"gte": min_occurences[0]}}},
194 194 }
195 195 }
196 196 )
197 197
198 198 min_duration = filter_settings.get("min_duration")
199 199 max_duration = filter_settings.get("max_duration")
200 200
201 201 request_ids = filter_settings.get("request_id")
202 202 if request_ids:
203 203 filter_part.append({"terms": {"request_id": request_ids}})
204 204
205 205 duration_range = {"range": {"average_duration": {}}}
206 206 if min_duration:
207 207 duration_range["range"]["average_duration"]["gte"] = min_duration[0]
208 208 if max_duration:
209 209 duration_range["range"]["average_duration"]["lte"] = max_duration[0]
210 210 if min_duration or max_duration:
211 211 filter_part.append(
212 212 {"has_parent": {"parent_type": "report_group", "query": duration_range}}
213 213 )
214 214
215 215 http_status = filter_settings.get("http_status")
216 216 report_type = filter_settings.get("report_type", [ReportType.error])
217 217 # set error report type if http status is not found
218 218 # and we are dealing with slow reports
219 219 if not http_status or ReportType.slow in report_type:
220 220 filter_part.append({"terms": {"report_type": report_type}})
221 221 if http_status:
222 222 filter_part.append({"terms": {"http_status": http_status}})
223 223
224 224 messages = filter_settings.get("message")
225 225 if messages:
226 226 condition = {"match": {"message": " ".join(messages)}}
227 227 query["query"]["bool"]["must"].append(condition)
228 228 errors = filter_settings.get("error")
229 229 if errors:
230 230 condition = {"match": {"error": " ".join(errors)}}
231 231 query["query"]["bool"]["must"].append(condition)
232 232 url_domains = filter_settings.get("url_domain")
233 233 if url_domains:
234 234 condition = {"terms": {"url_domain": url_domains}}
235 235 query["query"]["bool"]["must"].append(condition)
236 236 url_paths = filter_settings.get("url_path")
237 237 if url_paths:
238 238 condition = {"terms": {"url_path": url_paths}}
239 239 query["query"]["bool"]["must"].append(condition)
240 240
241 241 if filter_settings.get("report_status"):
242 242 for status in filter_settings.get("report_status"):
243 243 if status == "never_reviewed":
244 244 filter_part.append(
245 245 {
246 246 "has_parent": {
247 247 "parent_type": "report_group",
248 248 "query": {"term": {"read": False}},
249 249 }
250 250 }
251 251 )
252 252 elif status == "reviewed":
253 253 filter_part.append(
254 254 {
255 255 "has_parent": {
256 256 "parent_type": "report_group",
257 257 "query": {"term": {"read": True}},
258 258 }
259 259 }
260 260 )
261 261 elif status == "public":
262 262 filter_part.append(
263 263 {
264 264 "has_parent": {
265 265 "parent_type": "report_group",
266 266 "query": {"term": {"public": True}},
267 267 }
268 268 }
269 269 )
270 270 elif status == "fixed":
271 271 filter_part.append(
272 272 {
273 273 "has_parent": {
274 274 "parent_type": "report_group",
275 275 "query": {"term": {"fixed": True}},
276 276 }
277 277 }
278 278 )
279 279
280 280 # logging.getLogger('pyelasticsearch').setLevel(logging.DEBUG)
281 281 index_names = es_index_name_limiter(
282 282 filter_settings.get("start_date"),
283 283 filter_settings.get("end_date"),
284 284 ixtypes=["reports"],
285 285 )
286 286 if index_names:
287 287 results = Datastores.es.search(
288 288 body=query,
289 289 index=index_names,
290 290 doc_type=["report", "report_group"],
291 291 size=0,
292 292 )
293 293 else:
294 294 return []
295 295 return results["aggregations"]
296 296
297 297 @classmethod
298 298 def get_paginator_by_app_ids(
299 299 cls,
300 300 app_ids=None,
301 301 page=1,
302 302 item_count=None,
303 303 items_per_page=50,
304 304 order_by=None,
305 305 filter_settings=None,
306 306 exclude_columns=None,
307 307 db_session=None,
308 308 ):
309 309 if not filter_settings:
310 310 filter_settings = {}
311 311 results = cls.get_search_iterator(
312 312 app_ids, page, items_per_page, order_by, filter_settings
313 313 )
314 314
315 315 ordered_ids = []
316 316 if results:
317 317 for item in results["top_groups"]["buckets"]:
318 pg_id = item["top_reports_hits"]["hits"]["hits"][0]["_source"]["pg_id"]
318 pg_id = item["top_reports_hits"]["hits"]["hits"][0]["_source"]["report_id"]
319 319 ordered_ids.append(pg_id)
320 320 log.info(filter_settings)
321 321 paginator = paginate.Page(
322 322 ordered_ids, items_per_page=items_per_page, **filter_settings
323 323 )
324 324 sa_items = ()
325 325 if paginator.items:
326 326 db_session = get_db_session(db_session)
327 327 # latest report detail
328 328 query = db_session.query(Report)
329 329 query = query.options(sa.orm.joinedload(Report.report_group))
330 330 query = query.filter(Report.id.in_(paginator.items))
331 331 if filter_settings.get("order_col"):
332 332 order_col = filter_settings.get("order_col")
333 333 if filter_settings.get("order_dir") == "dsc":
334 334 sort_on = "desc"
335 335 else:
336 336 sort_on = "asc"
337 337 if order_col == "when":
338 338 order_col = "last_timestamp"
339 339 query = query.order_by(
340 340 getattr(sa, sort_on)(getattr(ReportGroup, order_col))
341 341 )
342 342 sa_items = query.all()
343 343 sorted_instance_list = []
344 344 for i_id in ordered_ids:
345 345 for report in sa_items:
346 346 if str(report.id) == i_id and report not in sorted_instance_list:
347 347 sorted_instance_list.append(report)
348 348 paginator.sa_items = sorted_instance_list
349 349 return paginator
350 350
351 351 @classmethod
352 352 def by_app_ids(cls, app_ids=None, order_by=True, db_session=None):
353 353 db_session = get_db_session(db_session)
354 354 q = db_session.query(ReportGroup)
355 355 if app_ids:
356 356 q = q.filter(ReportGroup.resource_id.in_(app_ids))
357 357 if order_by:
358 358 q = q.order_by(sa.desc(ReportGroup.id))
359 359 return q
360 360
361 361 @classmethod
362 362 def by_id(cls, group_id, app_ids=None, db_session=None):
363 363 db_session = get_db_session(db_session)
364 364 q = db_session.query(ReportGroup).filter(ReportGroup.id == int(group_id))
365 365 if app_ids:
366 366 q = q.filter(ReportGroup.resource_id.in_(app_ids))
367 367 return q.first()
368 368
369 369 @classmethod
370 370 def by_ids(cls, group_ids=None, db_session=None):
371 371 db_session = get_db_session(db_session)
372 372 query = db_session.query(ReportGroup)
373 373 query = query.filter(ReportGroup.id.in_(group_ids))
374 374 return query
375 375
376 376 @classmethod
377 377 def by_hash_and_resource(
378 378 cls, resource_id, grouping_hash, since_when=None, db_session=None
379 379 ):
380 380 db_session = get_db_session(db_session)
381 381 q = db_session.query(ReportGroup)
382 382 q = q.filter(ReportGroup.resource_id == resource_id)
383 383 q = q.filter(ReportGroup.grouping_hash == grouping_hash)
384 384 q = q.filter(ReportGroup.fixed == False)
385 385 if since_when:
386 386 q = q.filter(ReportGroup.first_timestamp >= since_when)
387 387 return q.first()
388 388
389 389 @classmethod
390 390 def users_commenting(cls, report_group, exclude_user_id=None, db_session=None):
391 391 db_session = get_db_session(None, report_group)
392 392 query = db_session.query(User).distinct()
393 393 query = query.filter(User.id == ReportComment.owner_id)
394 394 query = query.filter(ReportComment.group_id == report_group.id)
395 395 if exclude_user_id:
396 396 query = query.filter(ReportComment.owner_id != exclude_user_id)
397 397 return query
398 398
399 399 @classmethod
400 400 def affected_users_count(cls, report_group, db_session=None):
401 401 db_session = get_db_session(db_session)
402 402 query = db_session.query(sa.func.count(Report.username))
403 403 query = query.filter(Report.group_id == report_group.id)
404 404 query = query.filter(Report.username != "")
405 405 query = query.filter(Report.username != None)
406 406 query = query.group_by(Report.username)
407 407 return query.count()
408 408
409 409 @classmethod
410 410 def top_affected_users(cls, report_group, db_session=None):
411 411 db_session = get_db_session(db_session)
412 412 count_label = sa.func.count(Report.username).label("count")
413 413 query = db_session.query(Report.username, count_label)
414 414 query = query.filter(Report.group_id == report_group.id)
415 415 query = query.filter(Report.username != None)
416 416 query = query.filter(Report.username != "")
417 417 query = query.group_by(Report.username)
418 418 query = query.order_by(sa.desc(count_label))
419 419 query = query.limit(50)
420 420 return query
421 421
422 422 @classmethod
423 423 def get_report_stats(cls, request, filter_settings):
424 424 """
425 425 Gets report dashboard graphs
426 426 Returns information for BAR charts with occurences/interval information
427 427 detailed means version that returns time intervals - non detailed
428 428 returns total sum
429 429 """
430 430 delta = filter_settings["end_date"] - filter_settings["start_date"]
431 431 if delta < h.time_deltas.get("12h")["delta"]:
432 432 interval = "1m"
433 433 elif delta <= h.time_deltas.get("3d")["delta"]:
434 434 interval = "5m"
435 435 elif delta >= h.time_deltas.get("2w")["delta"]:
436 436 interval = "24h"
437 437 else:
438 438 interval = "1h"
439 439
440 440 group_id = filter_settings.get("group_id")
441 441
442 442 es_query = {
443 443 "aggs": {
444 444 "parent_agg": {
445 445 "aggs": {
446 446 "types": {
447 447 "aggs": {
448 448 "sub_agg": {"terms": {"field": "tags.type.values.keyword"}}
449 449 },
450 450 "filter": {
451 451 "bool": {
452 452 "filter": [{"exists": {"field": "tags.type.values"}}]
453 453 }
454 454 },
455 455 }
456 456 },
457 457 "date_histogram": {
458 458 "extended_bounds": {
459 459 "max": filter_settings["end_date"],
460 460 "min": filter_settings["start_date"],
461 461 },
462 462 "field": "timestamp",
463 463 "interval": interval,
464 464 "min_doc_count": 0,
465 465 },
466 466 }
467 467 },
468 468 "query": {
469 469 "bool": {
470 470 "filter": [
471 471 {
472 472 "terms": {
473 473 "resource_id": [filter_settings["resource"][0]]
474 474 }
475 475 },
476 476 {
477 477 "range": {
478 478 "timestamp": {
479 479 "gte": filter_settings["start_date"],
480 480 "lte": filter_settings["end_date"],
481 481 }
482 482 }
483 483 },
484 484 ]
485 485 }
486 486 },
487 487 }
488 488 if group_id:
489 489 parent_agg = es_query["aggs"]["parent_agg"]
490 490 filters = parent_agg["aggs"]["types"]["filter"]["bool"]["filter"]
491 491 filters.append({"terms": {"tags.group_id.values": [group_id]}})
492 492
493 493 index_names = es_index_name_limiter(
494 494 start_date=filter_settings["start_date"],
495 495 end_date=filter_settings["end_date"],
496 496 ixtypes=["reports"],
497 497 )
498 498
499 499 if not index_names:
500 500 return []
501 501
502 502 result = Datastores.es.search(
503 503 body=es_query, index=index_names, doc_type="log", size=0
504 504 )
505 505 series = []
506 506 for bucket in result["aggregations"]["parent_agg"]["buckets"]:
507 507 point = {
508 508 "x": datetime.utcfromtimestamp(int(bucket["key"]) / 1000),
509 509 "report": 0,
510 510 "not_found": 0,
511 511 "slow_report": 0,
512 512 }
513 513 for subbucket in bucket["types"]["sub_agg"]["buckets"]:
514 514 if subbucket["key"] == "slow":
515 515 point["slow_report"] = subbucket["doc_count"]
516 516 elif subbucket["key"] == "error":
517 517 point["report"] = subbucket["doc_count"]
518 518 elif subbucket["key"] == "not_found":
519 519 point["not_found"] = subbucket["doc_count"]
520 520 series.append(point)
521 521 return series
@@ -1,612 +1,612 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors
4 4 #
5 5 # Licensed under the Apache License, Version 2.0 (the "License");
6 6 # you may not use this file except in compliance with the License.
7 7 # You may obtain a copy of the License at
8 8 #
9 9 # http://www.apache.org/licenses/LICENSE-2.0
10 10 #
11 11 # Unless required by applicable law or agreed to in writing, software
12 12 # distributed under the License is distributed on an "AS IS" BASIS,
13 13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 14 # See the License for the specific language governing permissions and
15 15 # limitations under the License.
16 16
17 17 from datetime import datetime
18 18
19 19 import appenlight.lib.helpers as h
20 20 from appenlight.models import get_db_session, Datastores
21 21 from appenlight.models.services.base import BaseService
22 22 from appenlight.lib.enums import ReportType
23 23 from appenlight.lib.utils import es_index_name_limiter
24 24
25 25 try:
26 26 from ae_uptime_ce.models.services.uptime_metric import UptimeMetricService
27 27 except ImportError:
28 28 UptimeMetricService = None
29 29
30 30
31 31 def check_key(key, stats, uptime, total_seconds):
32 32 if key not in stats:
33 33 stats[key] = {
34 34 "name": key,
35 35 "requests": 0,
36 36 "errors": 0,
37 37 "tolerated_requests": 0,
38 38 "frustrating_requests": 0,
39 39 "satisfying_requests": 0,
40 40 "total_minutes": total_seconds / 60.0,
41 41 "uptime": uptime,
42 42 "apdex": 0,
43 43 "rpm": 0,
44 44 "response_time": 0,
45 45 "avg_response_time": 0,
46 46 }
47 47
48 48
49 49 class RequestMetricService(BaseService):
50 50 @classmethod
51 51 def get_metrics_stats(cls, request, filter_settings, db_session=None):
52 52 delta = filter_settings["end_date"] - filter_settings["start_date"]
53 53 if delta < h.time_deltas.get("12h")["delta"]:
54 54 interval = "1m"
55 55 elif delta <= h.time_deltas.get("3d")["delta"]:
56 56 interval = "5m"
57 57 elif delta >= h.time_deltas.get("2w")["delta"]:
58 58 interval = "24h"
59 59 else:
60 60 interval = "1h"
61 61
62 62 filter_settings["namespace"] = ["appenlight.request_metric"]
63 63
64 64 es_query = {
65 65 "aggs": {
66 66 "parent_agg": {
67 67 "aggs": {
68 68 "custom": {
69 69 "aggs": {
70 70 "sub_agg": {
71 71 "sum": {"field": "tags.custom.numeric_values"}
72 72 }
73 73 },
74 74 "filter": {
75 75 "exists": {"field": "tags.custom.numeric_values"}
76 76 },
77 77 },
78 78 "main": {
79 79 "aggs": {
80 80 "sub_agg": {
81 81 "sum": {"field": "tags.main.numeric_values"}
82 82 }
83 83 },
84 84 "filter": {"exists": {"field": "tags.main.numeric_values"}},
85 85 },
86 86 "nosql": {
87 87 "aggs": {
88 88 "sub_agg": {
89 89 "sum": {"field": "tags.nosql.numeric_values"}
90 90 }
91 91 },
92 92 "filter": {
93 93 "exists": {"field": "tags.nosql.numeric_values"}
94 94 },
95 95 },
96 96 "remote": {
97 97 "aggs": {
98 98 "sub_agg": {
99 99 "sum": {"field": "tags.remote.numeric_values"}
100 100 }
101 101 },
102 102 "filter": {
103 103 "exists": {"field": "tags.remote.numeric_values"}
104 104 },
105 105 },
106 106 "requests": {
107 107 "aggs": {
108 108 "sub_agg": {
109 109 "sum": {"field": "tags.requests.numeric_values"}
110 110 }
111 111 },
112 112 "filter": {
113 113 "exists": {"field": "tags.requests.numeric_values"}
114 114 },
115 115 },
116 116 "sql": {
117 117 "aggs": {
118 118 "sub_agg": {"sum": {"field": "tags.sql.numeric_values"}}
119 119 },
120 120 "filter": {"exists": {"field": "tags.sql.numeric_values"}},
121 121 },
122 122 "tmpl": {
123 123 "aggs": {
124 124 "sub_agg": {
125 125 "sum": {"field": "tags.tmpl.numeric_values"}
126 126 }
127 127 },
128 128 "filter": {"exists": {"field": "tags.tmpl.numeric_values"}},
129 129 },
130 130 },
131 131 "date_histogram": {
132 132 "extended_bounds": {
133 133 "max": filter_settings["end_date"],
134 134 "min": filter_settings["start_date"],
135 135 },
136 136 "field": "timestamp",
137 137 "interval": interval,
138 138 "min_doc_count": 0,
139 139 },
140 140 }
141 141 },
142 142 "query": {
143 143 "bool": {
144 144 "filter": [
145 145 {
146 146 "terms": {
147 147 "resource_id": [filter_settings["resource"][0]]
148 148 }
149 149 },
150 150 {
151 151 "range": {
152 152 "timestamp": {
153 153 "gte": filter_settings["start_date"],
154 154 "lte": filter_settings["end_date"],
155 155 }
156 156 }
157 157 },
158 158 {"terms": {"namespace": ["appenlight.request_metric"]}},
159 159 ]
160 160 }
161 161 },
162 162 }
163 163
164 164 index_names = es_index_name_limiter(
165 165 start_date=filter_settings["start_date"],
166 166 end_date=filter_settings["end_date"],
167 167 ixtypes=["metrics"],
168 168 )
169 169 if not index_names:
170 170 return []
171 171
172 172 result = Datastores.es.search(
173 173 body=es_query, index=index_names, doc_type="log", size=0
174 174 )
175 175
176 176 plot_data = []
177 177 for item in result["aggregations"]["parent_agg"]["buckets"]:
178 178 x_time = datetime.utcfromtimestamp(int(item["key"]) / 1000)
179 179 point = {"x": x_time}
180 180 for key in ["custom", "main", "nosql", "remote", "requests", "sql", "tmpl"]:
181 181 value = item[key]["sub_agg"]["value"]
182 182 point[key] = round(value, 3) if value else 0
183 183 plot_data.append(point)
184 184
185 185 return plot_data
186 186
187 187 @classmethod
188 188 def get_requests_breakdown(cls, request, filter_settings, db_session=None):
189 189 db_session = get_db_session(db_session)
190 190
191 191 # fetch total time of all requests in this time range
192 192 index_names = es_index_name_limiter(
193 193 start_date=filter_settings["start_date"],
194 194 end_date=filter_settings["end_date"],
195 195 ixtypes=["metrics"],
196 196 )
197 197
198 198 if index_names and filter_settings["resource"]:
199 199 es_query = {
200 200 "aggs": {
201 201 "main": {
202 202 "aggs": {
203 203 "sub_agg": {"sum": {"field": "tags.main.numeric_values"}}
204 204 },
205 205 "filter": {"exists": {"field": "tags.main.numeric_values"}},
206 206 }
207 207 },
208 208 "query": {
209 209 "bool": {
210 210 "filter": [
211 211 {
212 212 "terms": {
213 213 "resource_id": [filter_settings["resource"][0]]
214 214 }
215 215 },
216 216 {
217 217 "range": {
218 218 "timestamp": {
219 219 "gte": filter_settings["start_date"],
220 220 "lte": filter_settings["end_date"],
221 221 }
222 222 }
223 223 },
224 224 {"terms": {"namespace": ["appenlight.request_metric"]}},
225 225 ]
226 226 }
227 227 },
228 228 }
229 229 result = Datastores.es.search(
230 230 body=es_query, index=index_names, doc_type="log", size=0
231 231 )
232 232 total_time_spent = result["aggregations"]["main"]["sub_agg"]["value"]
233 233 else:
234 234 total_time_spent = 0
235 235 script_text = "doc['tags.main.numeric_values'].value / {}".format(
236 236 total_time_spent
237 237 )
238 238 if total_time_spent == 0:
239 239 script_text = '0'
240 240
241 241 if index_names and filter_settings["resource"]:
242 242 es_query = {
243 243 "aggs": {
244 244 "parent_agg": {
245 245 "aggs": {
246 246 "main": {
247 247 "aggs": {
248 248 "sub_agg": {
249 249 "sum": {"field": "tags.main.numeric_values"}
250 250 }
251 251 },
252 252 "filter": {
253 253 "exists": {"field": "tags.main.numeric_values"}
254 254 },
255 255 },
256 256 "percentage": {
257 257 "aggs": {
258 258 "sub_agg": {
259 259 "sum": {
260 260 "script": script_text,
261 261 }
262 262 }
263 263 },
264 264 "filter": {
265 265 "exists": {"field": "tags.main.numeric_values"}
266 266 },
267 267 },
268 268 "requests": {
269 269 "aggs": {
270 270 "sub_agg": {
271 271 "sum": {"field": "tags.requests.numeric_values"}
272 272 }
273 273 },
274 274 "filter": {
275 275 "exists": {"field": "tags.requests.numeric_values"}
276 276 },
277 277 },
278 278 },
279 279 "terms": {
280 280 "field": "tags.view_name.values.keyword",
281 281 "order": {"percentage>sub_agg": "desc"},
282 282 "size": 15,
283 283 },
284 284 }
285 285 },
286 286 "query": {
287 287 "bool": {
288 288 "filter": [
289 289 {
290 290 "terms": {
291 291 "resource_id": [filter_settings["resource"][0]]
292 292 }
293 293 },
294 294 {
295 295 "range": {
296 296 "timestamp": {
297 297 "gte": filter_settings["start_date"],
298 298 "lte": filter_settings["end_date"],
299 299 }
300 300 }
301 301 },
302 302 ]
303 303 }
304 304 },
305 305 }
306 306 result = Datastores.es.search(
307 307 body=es_query, index=index_names, doc_type="log", size=0
308 308 )
309 309 series = result["aggregations"]["parent_agg"]["buckets"]
310 310 else:
311 311 series = []
312 312
313 313 and_part = [
314 314 {"term": {"resource_id": filter_settings["resource"][0]}},
315 315 {"terms": {"tags.view_name.values": [row["key"] for row in series]}},
316 316 {"term": {"report_type": str(ReportType.slow)}},
317 317 ]
318 318 query = {
319 319 "aggs": {
320 320 "top_reports": {
321 321 "terms": {"field": "tags.view_name.values.keyword", "size": len(series)},
322 322 "aggs": {
323 323 "top_calls_hits": {
324 324 "top_hits": {"sort": {"start_time": "desc"}, "size": 5}
325 325 }
326 326 },
327 327 }
328 328 },
329 329 "query": {"bool": {"filter": and_part}},
330 330 }
331 331 details = {}
332 332 index_names = es_index_name_limiter(ixtypes=["reports"])
333 333 if index_names and series:
334 334 result = Datastores.es.search(
335 335 body=query, doc_type="report", size=0, index=index_names
336 336 )
337 337 for bucket in result["aggregations"]["top_reports"]["buckets"]:
338 338 details[bucket["key"]] = []
339 339
340 340 for hit in bucket["top_calls_hits"]["hits"]["hits"]:
341 341 details[bucket["key"]].append(
342 342 {
343 "report_id": hit["_source"]["pg_id"],
343 "report_id": hit["_source"]["request_metric_id"],
344 344 "group_id": hit["_source"]["group_id"],
345 345 }
346 346 )
347 347
348 348 results = []
349 349 for row in series:
350 350 result = {
351 351 "key": row["key"],
352 352 "main": row["main"]["sub_agg"]["value"],
353 353 "requests": row["requests"]["sub_agg"]["value"],
354 354 }
355 355 # es can return 'infinity'
356 356 try:
357 357 result["percentage"] = float(row["percentage"]["sub_agg"]["value"])
358 358 except ValueError:
359 359 result["percentage"] = 0
360 360
361 361 result["latest_details"] = details.get(row["key"]) or []
362 362 results.append(result)
363 363
364 364 return results
365 365
366 366 @classmethod
367 367 def get_apdex_stats(cls, request, filter_settings, threshold=1, db_session=None):
368 368 """
369 369 Returns information and calculates APDEX score per server for dashboard
370 370 server information (upper right stats boxes)
371 371 """
372 372 # Apdex t = (Satisfied Count + Tolerated Count / 2) / Total Samples
373 373 db_session = get_db_session(db_session)
374 374 index_names = es_index_name_limiter(
375 375 start_date=filter_settings["start_date"],
376 376 end_date=filter_settings["end_date"],
377 377 ixtypes=["metrics"],
378 378 )
379 379
380 380 requests_series = []
381 381
382 382 if index_names and filter_settings["resource"]:
383 383 es_query = {
384 384 "aggs": {
385 385 "parent_agg": {
386 386 "aggs": {
387 387 "frustrating": {
388 388 "aggs": {
389 389 "sub_agg": {
390 390 "sum": {"field": "tags.requests.numeric_values"}
391 391 }
392 392 },
393 393 "filter": {
394 394 "bool": {
395 395 "filter": [
396 396 {
397 397 "range": {
398 398 "tags.main.numeric_values": {"gte": "4"}
399 399 }
400 400 },
401 401 {
402 402 "exists": {
403 403 "field": "tags.requests.numeric_values"
404 404 }
405 405 },
406 406 ]
407 407 }
408 408 },
409 409 },
410 410 "main": {
411 411 "aggs": {
412 412 "sub_agg": {
413 413 "sum": {"field": "tags.main.numeric_values"}
414 414 }
415 415 },
416 416 "filter": {
417 417 "exists": {"field": "tags.main.numeric_values"}
418 418 },
419 419 },
420 420 "requests": {
421 421 "aggs": {
422 422 "sub_agg": {
423 423 "sum": {"field": "tags.requests.numeric_values"}
424 424 }
425 425 },
426 426 "filter": {
427 427 "exists": {"field": "tags.requests.numeric_values"}
428 428 },
429 429 },
430 430 "tolerated": {
431 431 "aggs": {
432 432 "sub_agg": {
433 433 "sum": {"field": "tags.requests.numeric_values"}
434 434 }
435 435 },
436 436 "filter": {
437 437 "bool": {"filter": [
438 438 {
439 439 "range": {
440 440 "tags.main.numeric_values": {"gte": "1"}
441 441 }
442 442 },
443 443 {
444 444 "range": {
445 445 "tags.main.numeric_values": {"lt": "4"}
446 446 }
447 447 },
448 448 {
449 449 "exists": {
450 450 "field": "tags.requests.numeric_values"
451 451 }
452 452 },
453 453 ]}
454 454 },
455 455 },
456 456 },
457 457 "terms": {"field": "tags.server_name.values.keyword", "size": 999999},
458 458 }
459 459 },
460 460 "query": {
461 461 "bool": {
462 462 "filter": [
463 463 {
464 464 "terms": {
465 465 "resource_id": [filter_settings["resource"][0]]
466 466 }
467 467 },
468 468 {
469 469 "range": {
470 470 "timestamp": {
471 471 "gte": filter_settings["start_date"],
472 472 "lte": filter_settings["end_date"],
473 473 }
474 474 }
475 475 },
476 476 {"terms": {"namespace": ["appenlight.request_metric"]}},
477 477 ]
478 478 }
479 479 },
480 480 }
481 481
482 482 result = Datastores.es.search(
483 483 body=es_query, index=index_names, doc_type="log", size=0
484 484 )
485 485 for bucket in result["aggregations"]["parent_agg"]["buckets"]:
486 486 requests_series.append(
487 487 {
488 488 "frustrating": bucket["frustrating"]["sub_agg"]["value"],
489 489 "main": bucket["main"]["sub_agg"]["value"],
490 490 "requests": bucket["requests"]["sub_agg"]["value"],
491 491 "tolerated": bucket["tolerated"]["sub_agg"]["value"],
492 492 "key": bucket["key"],
493 493 }
494 494 )
495 495
496 496 since_when = filter_settings["start_date"]
497 497 until = filter_settings["end_date"]
498 498
499 499 # total errors
500 500
501 501 index_names = es_index_name_limiter(
502 502 start_date=filter_settings["start_date"],
503 503 end_date=filter_settings["end_date"],
504 504 ixtypes=["reports"],
505 505 )
506 506
507 507 report_series = []
508 508 if index_names and filter_settings["resource"]:
509 509 report_type = ReportType.key_from_value(ReportType.error)
510 510 es_query = {
511 511 "aggs": {
512 512 "parent_agg": {
513 513 "aggs": {
514 514 "errors": {
515 515 "aggs": {
516 516 "sub_agg": {
517 517 "sum": {
518 518 "field": "tags.occurences.numeric_values"
519 519 }
520 520 }
521 521 },
522 522 "filter": {
523 523 "bool": {
524 524 "filter": [
525 525 {"terms": {"tags.type.values": [report_type]}},
526 526 {
527 527 "exists": {
528 528 "field": "tags.occurences.numeric_values"
529 529 }
530 530 },
531 531 ]
532 532 }
533 533 },
534 534 }
535 535 },
536 536 "terms": {"field": "tags.server_name.values.keyword", "size": 999999},
537 537 }
538 538 },
539 539 "query": {
540 540 "bool": {
541 541 "filter": [
542 542 {
543 543 "terms": {
544 544 "resource_id": [filter_settings["resource"][0]]
545 545 }
546 546 },
547 547 {
548 548 "range": {
549 549 "timestamp": {
550 550 "gte": filter_settings["start_date"],
551 551 "lte": filter_settings["end_date"],
552 552 }
553 553 }
554 554 },
555 555 {"terms": {"namespace": ["appenlight.error"]}},
556 556 ]
557 557 }
558 558 },
559 559 }
560 560 result = Datastores.es.search(
561 561 body=es_query, index=index_names, doc_type="log", size=0
562 562 )
563 563 for bucket in result["aggregations"]["parent_agg"]["buckets"]:
564 564 report_series.append(
565 565 {
566 566 "key": bucket["key"],
567 567 "errors": bucket["errors"]["sub_agg"]["value"],
568 568 }
569 569 )
570 570
571 571 stats = {}
572 572 if UptimeMetricService is not None:
573 573 uptime = UptimeMetricService.get_uptime_by_app(
574 574 filter_settings["resource"][0], since_when=since_when, until=until
575 575 )
576 576 else:
577 577 uptime = 0
578 578
579 579 total_seconds = (until - since_when).total_seconds()
580 580
581 581 for stat in requests_series:
582 582 check_key(stat["key"], stats, uptime, total_seconds)
583 583 stats[stat["key"]]["requests"] = int(stat["requests"])
584 584 stats[stat["key"]]["response_time"] = stat["main"]
585 585 stats[stat["key"]]["tolerated_requests"] = stat["tolerated"]
586 586 stats[stat["key"]]["frustrating_requests"] = stat["frustrating"]
587 587 for server in report_series:
588 588 check_key(server["key"], stats, uptime, total_seconds)
589 589 stats[server["key"]]["errors"] = server["errors"]
590 590
591 591 server_stats = list(stats.values())
592 592 for stat in server_stats:
593 593 stat["satisfying_requests"] = (
594 594 stat["requests"]
595 595 - stat["errors"]
596 596 - stat["frustrating_requests"]
597 597 - stat["tolerated_requests"]
598 598 )
599 599 if stat["satisfying_requests"] < 0:
600 600 stat["satisfying_requests"] = 0
601 601
602 602 if stat["requests"]:
603 603 stat["avg_response_time"] = round(
604 604 stat["response_time"] / stat["requests"], 3
605 605 )
606 606 qual_requests = (
607 607 stat["satisfying_requests"] + stat["tolerated_requests"] / 2.0
608 608 )
609 609 stat["apdex"] = round((qual_requests / stat["requests"]) * 100, 2)
610 610 stat["rpm"] = round(stat["requests"] / stat["total_minutes"], 2)
611 611
612 612 return sorted(server_stats, key=lambda x: x["name"])
@@ -1,127 +1,127 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors
4 4 #
5 5 # Licensed under the Apache License, Version 2.0 (the "License");
6 6 # you may not use this file except in compliance with the License.
7 7 # You may obtain a copy of the License at
8 8 #
9 9 # http://www.apache.org/licenses/LICENSE-2.0
10 10 #
11 11 # Unless required by applicable law or agreed to in writing, software
12 12 # distributed under the License is distributed on an "AS IS" BASIS,
13 13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 14 # See the License for the specific language governing permissions and
15 15 # limitations under the License.
16 16
17 17 import sqlalchemy as sa
18 18 import hashlib
19 19
20 20 from datetime import datetime, timedelta
21 21 from appenlight.models import Base
22 22 from sqlalchemy.dialects.postgresql import JSON
23 23 from ziggurat_foundations.models.base import BaseModel
24 24
25 25
26 26 class SlowCall(Base, BaseModel):
27 27 __tablename__ = "slow_calls"
28 28 __table_args__ = {"implicit_returning": False}
29 29
30 30 resource_id = sa.Column(sa.Integer(), nullable=False, index=True)
31 31 id = sa.Column(sa.Integer, nullable=False, primary_key=True)
32 32 report_id = sa.Column(
33 33 sa.BigInteger,
34 34 sa.ForeignKey("reports.id", ondelete="cascade", onupdate="cascade"),
35 35 primary_key=True,
36 36 )
37 37 duration = sa.Column(sa.Float(), default=0)
38 38 statement = sa.Column(sa.UnicodeText(), default="")
39 39 statement_hash = sa.Column(sa.Unicode(60), default="")
40 40 parameters = sa.Column(JSON(), nullable=False, default=dict)
41 41 type = sa.Column(sa.Unicode(16), default="")
42 42 subtype = sa.Column(sa.Unicode(16), default=None)
43 43 location = sa.Column(sa.Unicode(255), default="")
44 44 timestamp = sa.Column(
45 45 sa.DateTime(), default=datetime.utcnow, server_default=sa.func.now()
46 46 )
47 47 report_group_time = sa.Column(
48 48 sa.DateTime(), default=datetime.utcnow, server_default=sa.func.now()
49 49 )
50 50
51 51 def set_data(
52 52 self, data, protocol_version=None, resource_id=None, report_group=None
53 53 ):
54 54 self.resource_id = resource_id
55 55 if data.get("start") and data.get("end"):
56 56 self.timestamp = data.get("start")
57 57 d = data.get("end") - data.get("start")
58 58 self.duration = d.total_seconds()
59 59 self.statement = data.get("statement", "")
60 60 self.type = data.get("type", "unknown")[:16]
61 61 self.parameters = data.get("parameters", {})
62 62 self.location = data.get("location", "")[:255]
63 63 self.report_group_time = report_group.first_timestamp
64 64 if "subtype" in data:
65 65 self.subtype = data.get("subtype", "unknown")[:16]
66 66 if self.type == "tmpl":
67 67 self.set_hash("{} {}".format(self.statement, self.parameters))
68 68 else:
69 69 self.set_hash()
70 70
71 71 def set_hash(self, custom_statement=None):
72 72 statement = custom_statement or self.statement
73 73 self.statement_hash = hashlib.sha1(statement.encode("utf8")).hexdigest()
74 74
75 75 @property
76 76 def end_time(self):
77 77 if self.duration and self.timestamp:
78 78 return self.timestamp + timedelta(seconds=self.duration)
79 79 return None
80 80
81 81 def get_dict(self):
82 82 instance_dict = super(SlowCall, self).get_dict()
83 83 instance_dict["children"] = []
84 84 instance_dict["end_time"] = self.end_time
85 85 return instance_dict
86 86
87 87 def es_doc(self):
88 88 doc = {
89 89 "resource_id": self.resource_id,
90 90 "timestamp": self.timestamp,
91 "pg_id": str(self.id),
91 "slow_call_id": str(self.id),
92 92 "permanent": False,
93 93 "request_id": None,
94 94 "log_level": "UNKNOWN",
95 95 "message": self.statement,
96 96 "namespace": "appenlight.slow_call",
97 97 "tags": {
98 98 "report_id": {
99 99 "values": self.report_id,
100 100 "numeric_values": self.report_id,
101 101 },
102 102 "duration": {"values": None, "numeric_values": self.duration},
103 103 "statement_hash": {
104 104 "values": self.statement_hash,
105 105 "numeric_values": None,
106 106 },
107 107 "type": {"values": self.type, "numeric_values": None},
108 108 "subtype": {"values": self.subtype, "numeric_values": None},
109 109 "location": {"values": self.location, "numeric_values": None},
110 110 "parameters": {"values": None, "numeric_values": None},
111 111 },
112 112 "tag_list": [
113 113 "report_id",
114 114 "duration",
115 115 "statement_hash",
116 116 "type",
117 117 "subtype",
118 118 "location",
119 119 ],
120 120 }
121 121 if isinstance(self.parameters, str):
122 122 doc["tags"]["parameters"]["values"] = self.parameters[:255]
123 123 return doc
124 124
125 125 @property
126 126 def partition_id(self):
127 127 return "rcae_sc_%s" % self.report_group_time.strftime("%Y_%m")
@@ -1,458 +1,577 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors
4 4 #
5 5 # Licensed under the Apache License, Version 2.0 (the "License");
6 6 # you may not use this file except in compliance with the License.
7 7 # You may obtain a copy of the License at
8 8 #
9 9 # http://www.apache.org/licenses/LICENSE-2.0
10 10 #
11 11 # Unless required by applicable law or agreed to in writing, software
12 12 # distributed under the License is distributed on an "AS IS" BASIS,
13 13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 14 # See the License for the specific language governing permissions and
15 15 # limitations under the License.
16 16
17 17 import argparse
18 18 import datetime
19 19 import logging
20 import copy
20 21
21 22 import sqlalchemy as sa
22 23 import elasticsearch.exceptions
23 24 import elasticsearch.helpers
24 25
25 26 from collections import defaultdict
26 27 from pyramid.paster import setup_logging
27 28 from pyramid.paster import bootstrap
28 29 from appenlight.models import DBSession, Datastores, metadata
29 30 from appenlight.lib import get_callable
30 31 from appenlight.models.report_group import ReportGroup
31 32 from appenlight.models.report import Report
32 33 from appenlight.models.report_stat import ReportStat
33 34 from appenlight.models.log import Log
34 35 from appenlight.models.slow_call import SlowCall
35 36 from appenlight.models.metric import Metric
36 37
37
38 38 log = logging.getLogger(__name__)
39 39
40 40 tables = {
41 41 "slow_calls_p_": [],
42 42 "reports_stats_p_": [],
43 43 "reports_p_": [],
44 44 "reports_groups_p_": [],
45 45 "logs_p_": [],
46 46 "metrics_p_": [],
47 47 }
48 48
49 49
50 50 def detect_tables(table_prefix):
51 51 found_tables = []
52 52 db_tables_query = """
53 53 SELECT tablename FROM pg_tables WHERE tablename NOT LIKE 'pg_%' AND
54 54 tablename NOT LIKE 'sql_%' ORDER BY tablename ASC;"""
55 55
56 56 for table in DBSession.execute(db_tables_query).fetchall():
57 57 tablename = table.tablename
58 58 if tablename.startswith(table_prefix):
59 59 t = sa.Table(
60 60 tablename, metadata, autoload=True, autoload_with=DBSession.bind.engine
61 61 )
62 62 found_tables.append(t)
63 63 return found_tables
64 64
65 65
66 66 def main():
67 67 """
68 68 Recreates Elasticsearch indexes
69 69 Performs reindex of whole db to Elasticsearch
70 70
71 71 """
72 72
73 73 # need parser twice because we first need to load ini file
74 74 # bootstrap pyramid and then load plugins
75 75 pre_parser = argparse.ArgumentParser(
76 76 description="Reindex AppEnlight data", add_help=False
77 77 )
78 78 pre_parser.add_argument(
79 79 "-c", "--config", required=True, help="Configuration ini file of application"
80 80 )
81 81 pre_parser.add_argument("-h", "--help", help="Show help", nargs="?")
82 82 pre_parser.add_argument(
83 83 "-t", "--types", nargs="+", help="Which parts of database should get reindexed"
84 84 )
85 85 args = pre_parser.parse_args()
86 86
87 87 config_uri = args.config
88 88 setup_logging(config_uri)
89 89 log.setLevel(logging.INFO)
90 90 env = bootstrap(config_uri)
91 91 parser = argparse.ArgumentParser(description="Reindex AppEnlight data")
92 92 choices = {
93 93 "reports": "appenlight.scripts.reindex_elasticsearch:reindex_reports",
94 94 "logs": "appenlight.scripts.reindex_elasticsearch:reindex_logs",
95 95 "metrics": "appenlight.scripts.reindex_elasticsearch:reindex_metrics",
96 96 "slow_calls": "appenlight.scripts.reindex_elasticsearch:reindex_slow_calls",
97 97 "template": "appenlight.scripts.reindex_elasticsearch:update_template",
98 98 }
99 99 for k, v in env["registry"].appenlight_plugins.items():
100 100 if v.get("fulltext_indexer"):
101 101 choices[k] = v["fulltext_indexer"]
102 102 parser.add_argument(
103 103 "-t",
104 104 "--types",
105 105 nargs="*",
106 106 choices=["all"] + list(choices.keys()),
107 107 default=[],
108 108 help="Which parts of database should get reindexed",
109 109 )
110 110 parser.add_argument(
111 111 "-c", "--config", required=True, help="Configuration ini file of application"
112 112 )
113 113 args = parser.parse_args()
114 114
115 115 if "all" in args.types:
116 116 args.types = list(choices.keys())
117 117
118 118 print("Selected types to reindex: {}".format(args.types))
119 119
120 120 log.info("settings {}".format(args.types))
121 121
122 122 if "template" in args.types:
123 123 get_callable(choices["template"])()
124 124 args.types.remove("template")
125 125 for selected in args.types:
126 126 get_callable(choices[selected])()
127 127
128 128
129 129 def update_template():
130 130 try:
131 Datastores.es.indices.delete_template("rcae")
131 Datastores.es.indices.delete_template("rcae_reports")
132 except elasticsearch.exceptions.NotFoundError as e:
133 log.error(e)
134
135 try:
136 Datastores.es.indices.delete_template("rcae_logs")
137 except elasticsearch.exceptions.NotFoundError as e:
138 log.error(e)
139 try:
140 Datastores.es.indices.delete_template("rcae_slow_calls")
141 except elasticsearch.exceptions.NotFoundError as e:
142 log.error(e)
143 try:
144 Datastores.es.indices.delete_template("rcae_metrics")
132 145 except elasticsearch.exceptions.NotFoundError as e:
133 146 log.error(e)
134 147 log.info("updating elasticsearch template")
135 148 tag_templates = [
136 149 {
137 150 "values": {
138 151 "path_match": "tags.*",
139 152 "mapping": {
140 153 "type": "object",
141 154 "properties": {
142 155 "values": {"type": "text", "analyzer": "tag_value",
143 156 "fields": {
144 157 "keyword": {
145 158 "type": "keyword",
146 159 "ignore_above": 256
147 160 }
148 161 }},
149 162 "numeric_values": {"type": "float"},
150 163 },
151 164 },
152 165 }
153 166 }
154 167 ]
155 168
156 template_schema = {
157 "template": "rcae_*",
169 shared_analysis = {
170 "analyzer": {
171 "url_path": {
172 "type": "custom",
173 "char_filter": [],
174 "tokenizer": "path_hierarchy",
175 "filter": [],
176 },
177 "tag_value": {
178 "type": "custom",
179 "char_filter": [],
180 "tokenizer": "keyword",
181 "filter": ["lowercase"],
182 },
183 }
184 }
185
186 shared_log_mapping = {
187 "_all": {"enabled": False},
188 "dynamic_templates": tag_templates,
189 "properties": {
190 "pg_id": {"type": "keyword", "index": True},
191 "delete_hash": {"type": "keyword", "index": True},
192 "resource_id": {"type": "integer"},
193 "timestamp": {"type": "date"},
194 "permanent": {"type": "boolean"},
195 "request_id": {"type": "keyword", "index": True},
196 "log_level": {"type": "text", "analyzer": "simple"},
197 "message": {"type": "text", "analyzer": "simple"},
198 "namespace": {
199 "type": "text",
200 "fields": {"keyword": {"type": "keyword", "ignore_above": 256}},
201 },
202 "tags": {"type": "object"},
203 "tag_list": {"type": "text", "analyzer": "tag_value",
204 "fields": {
205 "keyword": {
206 "type": "keyword",
207 "ignore_above": 256
208 }
209 }},
210 },
211 }
212
213 report_schema = {
214 "template": "rcae_r_*",
158 215 "settings": {
159 216 "index": {
160 217 "refresh_interval": "5s",
161 218 "translog": {"sync_interval": "5s", "durability": "async"},
219 "mapping": {"single_type": True}
162 220 },
163 221 "number_of_shards": 5,
164 "analysis": {
165 "analyzer": {
166 "url_path": {
167 "type": "custom",
168 "char_filter": [],
169 "tokenizer": "path_hierarchy",
170 "filter": [],
171 },
172 "tag_value": {
173 "type": "custom",
174 "char_filter": [],
175 "tokenizer": "keyword",
176 "filter": ["lowercase"],
177 },
178 }
179 },
222 "analysis": shared_analysis,
180 223 },
181 224 "mappings": {
182 "report_group": {
225 "report": {
183 226 "_all": {"enabled": False},
184 227 "dynamic_templates": tag_templates,
185 228 "properties": {
186 "pg_id": {"type": "keyword", "index": True},
229 "type": {"type": "keyword", "index": True},
230 # report group
231 "group_id": {"type": "keyword", "index": True},
187 232 "resource_id": {"type": "integer"},
188 233 "priority": {"type": "integer"},
189 234 "error": {"type": "text", "analyzer": "simple"},
190 235 "read": {"type": "boolean"},
191 236 "occurences": {"type": "integer"},
192 237 "fixed": {"type": "boolean"},
193 238 "first_timestamp": {"type": "date"},
194 239 "last_timestamp": {"type": "date"},
195 240 "average_duration": {"type": "float"},
196 241 "summed_duration": {"type": "float"},
197 242 "public": {"type": "boolean"},
198 },
199 },
200 "report": {
201 "_all": {"enabled": False},
202 "dynamic_templates": tag_templates,
203 "properties": {
204 "pg_id": {"type": "keyword", "index": True},
205 "resource_id": {"type": "integer"},
206 "group_id": {"type": "keyword"},
243 # report
244
245 "report_id": {"type": "keyword", "index": True},
207 246 "http_status": {"type": "integer"},
208 247 "ip": {"type": "keyword", "index": True},
209 248 "url_domain": {"type": "text", "analyzer": "simple"},
210 249 "url_path": {"type": "text", "analyzer": "url_path"},
211 "error": {"type": "text", "analyzer": "simple"},
212 250 "report_type": {"type": "integer"},
213 251 "start_time": {"type": "date"},
214 252 "request_id": {"type": "keyword", "index": True},
215 253 "end_time": {"type": "date"},
216 254 "duration": {"type": "float"},
217 255 "tags": {"type": "object"},
218 256 "tag_list": {"type": "text", "analyzer": "tag_value",
219 257 "fields": {
220 258 "keyword": {
221 259 "type": "keyword",
222 260 "ignore_above": 256
223 261 }
224 262 }},
225 263 "extra": {"type": "object"},
226 },
227 "_parent": {"type": "report_group"},
228 },
229 "log": {
230 "_all": {"enabled": False},
231 "dynamic_templates": tag_templates,
232 "properties": {
233 "pg_id": {"type": "keyword", "index": True},
234 "delete_hash": {"type": "keyword", "index": True},
235 "resource_id": {"type": "integer"},
264
265 # report stats
266
267 "report_stat_id": {"type": "keyword", "index": True},
236 268 "timestamp": {"type": "date"},
237 269 "permanent": {"type": "boolean"},
238 "request_id": {"type": "keyword", "index": True},
239 270 "log_level": {"type": "text", "analyzer": "simple"},
240 271 "message": {"type": "text", "analyzer": "simple"},
241 272 "namespace": {
242 273 "type": "text",
243 274 "fields": {"keyword": {"type": "keyword", "ignore_above": 256}},
244 275 },
245 "tags": {"type": "object"},
246 "tag_list": {"type": "text", "analyzer": "tag_value",
247 "fields": {
248 "keyword": {
249 "type": "keyword",
250 "ignore_above": 256
251 }
252 }},
276
277 "join_field": {
278 "type": "join",
279 "relations": {
280 "report_group": ["report", "report_stat"]
281 }
282 }
283
253 284 },
285 }
286 }
287 }
288
289 Datastores.es.indices.put_template("rcae_reports", body=report_schema)
290
291 logs_mapping = copy.deepcopy(shared_log_mapping)
292 logs_mapping["properties"]["log_id"] = logs_mapping["properties"]["pg_id"]
293 del logs_mapping["properties"]["pg_id"]
294
295 log_template = {
296 "template": "rcae_l_*",
297 "settings": {
298 "index": {
299 "refresh_interval": "5s",
300 "translog": {"sync_interval": "5s", "durability": "async"},
301 "mapping": {"single_type": True}
254 302 },
303 "number_of_shards": 5,
304 "analysis": shared_analysis,
305 },
306 "mappings": {
307 "log": logs_mapping,
255 308 },
256 309 }
257 310
258 Datastores.es.indices.put_template("rcae", body=template_schema)
311 Datastores.es.indices.put_template("rcae_logs", body=log_template)
312
313 slow_call_mapping = copy.deepcopy(shared_log_mapping)
314 slow_call_mapping["properties"]["slow_call_id"] = slow_call_mapping["properties"]["pg_id"]
315 del slow_call_mapping["properties"]["pg_id"]
316
317 slow_call_template = {
318 "template": "rcae_sc_*",
319 "settings": {
320 "index": {
321 "refresh_interval": "5s",
322 "translog": {"sync_interval": "5s", "durability": "async"},
323 "mapping": {"single_type": True}
324 },
325 "number_of_shards": 5,
326 "analysis": shared_analysis,
327 },
328 "mappings": {
329 "log": slow_call_mapping,
330 },
331 }
332
333 Datastores.es.indices.put_template("rcae_slow_calls", body=slow_call_template)
334
335 metric_mapping = copy.deepcopy(shared_log_mapping)
336 metric_mapping["properties"]["metric_id"] = metric_mapping["properties"]["pg_id"]
337 del metric_mapping["properties"]["pg_id"]
338
339 metrics_template = {
340 "template": "rcae_m_*",
341 "settings": {
342 "index": {
343 "refresh_interval": "5s",
344 "translog": {"sync_interval": "5s", "durability": "async"},
345 "mapping": {"single_type": True}
346 },
347 "number_of_shards": 5,
348 "analysis": shared_analysis,
349 },
350 "mappings": {
351 "log": metric_mapping,
352 },
353 }
354
355 Datastores.es.indices.put_template("rcae_metrics", body=metrics_template)
356
357 uptime_metric_mapping = copy.deepcopy(shared_log_mapping)
358 uptime_metric_mapping["properties"]["uptime_id"] = uptime_metric_mapping["properties"]["pg_id"]
359 del uptime_metric_mapping["properties"]["pg_id"]
360
361 uptime_metrics_template = {
362 "template": "rcae_uptime_ce_*",
363 "settings": {
364 "index": {
365 "refresh_interval": "5s",
366 "translog": {"sync_interval": "5s", "durability": "async"},
367 "mapping": {"single_type": True}
368 },
369 "number_of_shards": 5,
370 "analysis": shared_analysis,
371 },
372 "mappings": {
373 "log": shared_log_mapping,
374 },
375 }
376
377 Datastores.es.indices.put_template("rcae_uptime_metrics", body=uptime_metrics_template)
259 378
260 379
261 380 def reindex_reports():
262 381 reports_groups_tables = detect_tables("reports_groups_p_")
263 382 try:
264 Datastores.es.indices.delete("rcae_r*")
383 Datastores.es.indices.delete("`rcae_r_*")
265 384 except elasticsearch.exceptions.NotFoundError as e:
266 385 log.error(e)
267 386
268 387 log.info("reindexing report groups")
269 388 i = 0
270 389 task_start = datetime.datetime.now()
271 390 for partition_table in reports_groups_tables:
272 391 conn = DBSession.connection().execution_options(stream_results=True)
273 392 result = conn.execute(partition_table.select())
274 393 while True:
275 394 chunk = result.fetchmany(2000)
276 395 if not chunk:
277 396 break
278 397 es_docs = defaultdict(list)
279 398 for row in chunk:
280 399 i += 1
281 400 item = ReportGroup(**dict(list(row.items())))
282 401 d_range = item.partition_id
283 402 es_docs[d_range].append(item.es_doc())
284 403 if es_docs:
285 404 name = partition_table.name
286 405 log.info("round {}, {}".format(i, name))
287 406 for k, v in es_docs.items():
288 to_update = {"_index": k, "_type": "report_group"}
407 to_update = {"_index": k, "_type": "report"}
289 408 [i.update(to_update) for i in v]
290 409 elasticsearch.helpers.bulk(Datastores.es, v)
291 410
292 411 log.info("total docs {} {}".format(i, datetime.datetime.now() - task_start))
293 412
294 413 i = 0
295 414 log.info("reindexing reports")
296 415 task_start = datetime.datetime.now()
297 416 reports_tables = detect_tables("reports_p_")
298 417 for partition_table in reports_tables:
299 418 conn = DBSession.connection().execution_options(stream_results=True)
300 419 result = conn.execute(partition_table.select())
301 420 while True:
302 421 chunk = result.fetchmany(2000)
303 422 if not chunk:
304 423 break
305 424 es_docs = defaultdict(list)
306 425 for row in chunk:
307 426 i += 1
308 427 item = Report(**dict(list(row.items())))
309 428 d_range = item.partition_id
310 429 es_docs[d_range].append(item.es_doc())
311 430 if es_docs:
312 431 name = partition_table.name
313 432 log.info("round {}, {}".format(i, name))
314 433 for k, v in es_docs.items():
315 434 to_update = {"_index": k, "_type": "report"}
316 435 [i.update(to_update) for i in v]
317 436 elasticsearch.helpers.bulk(Datastores.es, v)
318 437
319 438 log.info("total docs {} {}".format(i, datetime.datetime.now() - task_start))
320 439
321 440 log.info("reindexing reports stats")
322 441 i = 0
323 442 task_start = datetime.datetime.now()
324 443 reports_stats_tables = detect_tables("reports_stats_p_")
325 444 for partition_table in reports_stats_tables:
326 445 conn = DBSession.connection().execution_options(stream_results=True)
327 446 result = conn.execute(partition_table.select())
328 447 while True:
329 448 chunk = result.fetchmany(2000)
330 449 if not chunk:
331 450 break
332 451 es_docs = defaultdict(list)
333 452 for row in chunk:
334 453 rd = dict(list(row.items()))
335 454 # remove legacy columns
336 455 # TODO: remove the column later
337 456 rd.pop("size", None)
338 457 item = ReportStat(**rd)
339 458 i += 1
340 459 d_range = item.partition_id
341 460 es_docs[d_range].append(item.es_doc())
342 461 if es_docs:
343 462 name = partition_table.name
344 463 log.info("round {}, {}".format(i, name))
345 464 for k, v in es_docs.items():
346 to_update = {"_index": k, "_type": "log"}
465 to_update = {"_index": k, "_type": "report"}
347 466 [i.update(to_update) for i in v]
348 467 elasticsearch.helpers.bulk(Datastores.es, v)
349 468
350 469 log.info("total docs {} {}".format(i, datetime.datetime.now() - task_start))
351 470
352 471
353 472 def reindex_logs():
354 473 try:
355 Datastores.es.indices.delete("rcae_l*")
474 Datastores.es.indices.delete("rcae_l_*")
356 475 except elasticsearch.exceptions.NotFoundError as e:
357 476 log.error(e)
358 477
359 478 # logs
360 479 log.info("reindexing logs")
361 480 i = 0
362 481 task_start = datetime.datetime.now()
363 482 log_tables = detect_tables("logs_p_")
364 483 for partition_table in log_tables:
365 484 conn = DBSession.connection().execution_options(stream_results=True)
366 485 result = conn.execute(partition_table.select())
367 486 while True:
368 487 chunk = result.fetchmany(2000)
369 488 if not chunk:
370 489 break
371 490 es_docs = defaultdict(list)
372 491
373 492 for row in chunk:
374 493 i += 1
375 494 item = Log(**dict(list(row.items())))
376 495 d_range = item.partition_id
377 496 es_docs[d_range].append(item.es_doc())
378 497 if es_docs:
379 498 name = partition_table.name
380 499 log.info("round {}, {}".format(i, name))
381 500 for k, v in es_docs.items():
382 501 to_update = {"_index": k, "_type": "log"}
383 502 [i.update(to_update) for i in v]
384 503 elasticsearch.helpers.bulk(Datastores.es, v)
385 504
386 505 log.info("total docs {} {}".format(i, datetime.datetime.now() - task_start))
387 506
388 507
389 508 def reindex_metrics():
390 509 try:
391 Datastores.es.indices.delete("rcae_m*")
510 Datastores.es.indices.delete("rcae_m_*")
392 511 except elasticsearch.exceptions.NotFoundError as e:
393 512 log.error(e)
394 513
395 514 log.info("reindexing applications metrics")
396 515 i = 0
397 516 task_start = datetime.datetime.now()
398 517 metric_tables = detect_tables("metrics_p_")
399 518 for partition_table in metric_tables:
400 519 conn = DBSession.connection().execution_options(stream_results=True)
401 520 result = conn.execute(partition_table.select())
402 521 while True:
403 522 chunk = result.fetchmany(2000)
404 523 if not chunk:
405 524 break
406 525 es_docs = defaultdict(list)
407 526 for row in chunk:
408 527 i += 1
409 528 item = Metric(**dict(list(row.items())))
410 529 d_range = item.partition_id
411 530 es_docs[d_range].append(item.es_doc())
412 531 if es_docs:
413 532 name = partition_table.name
414 533 log.info("round {}, {}".format(i, name))
415 534 for k, v in es_docs.items():
416 535 to_update = {"_index": k, "_type": "log"}
417 536 [i.update(to_update) for i in v]
418 537 elasticsearch.helpers.bulk(Datastores.es, v)
419 538
420 539 log.info("total docs {} {}".format(i, datetime.datetime.now() - task_start))
421 540
422 541
423 542 def reindex_slow_calls():
424 543 try:
425 Datastores.es.indices.delete("rcae_sc*")
544 Datastores.es.indices.delete("rcae_sc_*")
426 545 except elasticsearch.exceptions.NotFoundError as e:
427 546 log.error(e)
428 547
429 548 log.info("reindexing slow calls")
430 549 i = 0
431 550 task_start = datetime.datetime.now()
432 551 slow_calls_tables = detect_tables("slow_calls_p_")
433 552 for partition_table in slow_calls_tables:
434 553 conn = DBSession.connection().execution_options(stream_results=True)
435 554 result = conn.execute(partition_table.select())
436 555 while True:
437 556 chunk = result.fetchmany(2000)
438 557 if not chunk:
439 558 break
440 559 es_docs = defaultdict(list)
441 560 for row in chunk:
442 561 i += 1
443 562 item = SlowCall(**dict(list(row.items())))
444 563 d_range = item.partition_id
445 564 es_docs[d_range].append(item.es_doc())
446 565 if es_docs:
447 566 name = partition_table.name
448 567 log.info("round {}, {}".format(i, name))
449 568 for k, v in es_docs.items():
450 569 to_update = {"_index": k, "_type": "log"}
451 570 [i.update(to_update) for i in v]
452 571 elasticsearch.helpers.bulk(Datastores.es, v)
453 572
454 573 log.info("total docs {} {}".format(i, datetime.datetime.now() - task_start))
455 574
456 575
457 576 if __name__ == "__main__":
458 577 main()
General Comments 4
Under Review
author

Auto status change to "Under Review"

Under Review
author

Auto status change to "Under Review"

You need to be logged in to leave comments. Login now