##// END OF EJS Templates
tasks: change sampling rates for something better suited for big amounts of data
ergo -
Show More
@@ -1,662 +1,663 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2016 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # AppEnlight Enterprise Edition, including its added features, Support
19 19 # services, and proprietary license terms, please see
20 20 # https://rhodecode.com/licenses/
21 21
22 22 import bisect
23 23 import collections
24 24 import math
25 25 from datetime import datetime, timedelta
26 26
27 27 import sqlalchemy as sa
28 28 import pyelasticsearch
29 29
30 30 from celery.utils.log import get_task_logger
31 31 from zope.sqlalchemy import mark_changed
32 32 from pyramid.threadlocal import get_current_request, get_current_registry
33 33 from appenlight.celery import celery
34 34 from appenlight.models.report_group import ReportGroup
35 35 from appenlight.models import DBSession, Datastores
36 36 from appenlight.models.report import Report
37 37 from appenlight.models.log import Log
38 38 from appenlight.models.request_metric import Metric
39 39 from appenlight.models.event import Event
40 40
41 41 from appenlight.models.services.application import ApplicationService
42 42 from appenlight.models.services.event import EventService
43 43 from appenlight.models.services.log import LogService
44 44 from appenlight.models.services.report import ReportService
45 45 from appenlight.models.services.report_group import ReportGroupService
46 46 from appenlight.models.services.user import UserService
47 47 from appenlight.models.tag import Tag
48 48 from appenlight.lib import print_traceback
49 49 from appenlight.lib.utils import parse_proto, in_batches
50 50 from appenlight.lib.ext_json import json
51 51 from appenlight.lib.redis_keys import REDIS_KEYS
52 52 from appenlight.lib.enums import ReportType
53 53
54 54 log = get_task_logger(__name__)
55 55
56 sample_boundries = list(range(100, 10000, 100))
56 sample_boundries = list(range(100, 1000, 100)) + \
57 list(range(1000, 10000, 1000)) + \
58 list(range(10000, 100000, 5000))
57 59
58 60
59 def pick_sample(total_occurences, report_type=1):
61 def pick_sample(total_occurences, report_type=None):
60 62 every = 1.0
61 63 position = bisect.bisect_left(sample_boundries, total_occurences)
62 64 if position > 0:
63 # 404
64 if report_type == 2:
65 if report_type == ReportType.not_found:
65 66 divide = 10.0
66 67 else:
67 68 divide = 100.0
68 69 every = sample_boundries[position - 1] / divide
69 70 return total_occurences % every == 0
70 71
71 72
72 73 @celery.task(queue="default", default_retry_delay=1, max_retries=2)
73 74 def test_exception_task():
74 75 log.error('test celery log', extra={'location': 'celery'})
75 76 log.warning('test celery log', extra={'location': 'celery'})
76 77 raise Exception('Celery exception test')
77 78
78 79
79 80 @celery.task(queue="default", default_retry_delay=1, max_retries=2)
80 81 def test_retry_exception_task():
81 82 try:
82 83 import time
83 84
84 85 time.sleep(1.3)
85 86 log.error('test retry celery log', extra={'location': 'celery'})
86 87 log.warning('test retry celery log', extra={'location': 'celery'})
87 88 raise Exception('Celery exception test')
88 89 except Exception as exc:
89 90 test_retry_exception_task.retry(exc=exc)
90 91
91 92
92 93 @celery.task(queue="reports", default_retry_delay=600, max_retries=999)
93 94 def add_reports(resource_id, params, dataset, environ=None, **kwargs):
94 95 proto_version = parse_proto(params.get('protocol_version', ''))
95 96 current_time = datetime.utcnow().replace(second=0, microsecond=0)
96 97 try:
97 98 # we will store solr docs here for single insert
98 99 es_report_docs = {}
99 100 es_report_group_docs = {}
100 101 resource = ApplicationService.by_id(resource_id)
101 102 reports = []
102 103
103 104 if proto_version.major < 1 and proto_version.minor < 5:
104 105 for report_data in dataset:
105 106 report_details = report_data.get('report_details', [])
106 107 for i, detail_data in enumerate(report_details):
107 108 report_data.update(detail_data)
108 109 report_data.pop('report_details')
109 110 traceback = report_data.get('traceback')
110 111 if traceback is None:
111 112 report_data['traceback'] = report_data.get('frameinfo')
112 113 # for 0.3 api
113 114 error = report_data.pop('error_type', '')
114 115 if error:
115 116 report_data['error'] = error
116 117 if proto_version.minor < 4:
117 118 # convert "Unknown" slow reports to
118 119 # '' (from older clients)
119 120 if (report_data['error'] and
120 121 report_data['http_status'] < 500):
121 122 report_data['error'] = ''
122 123 message = report_data.get('message')
123 124 if 'extra' not in report_data:
124 125 report_data['extra'] = []
125 126 if message:
126 127 report_data['extra'] = [('message', message), ]
127 128 reports.append(report_data)
128 129 else:
129 130 reports = dataset
130 131
131 132 tags = []
132 133 es_slow_calls_docs = {}
133 134 es_reports_stats_rows = {}
134 135 for report_data in reports:
135 136 # build report details for later
136 137 added_details = 0
137 138 report = Report()
138 139 report.set_data(report_data, resource, proto_version)
139 140 report._skip_ft_index = True
140 141
141 142 report_group = ReportGroupService.by_hash_and_resource(
142 143 report.resource_id,
143 144 report.grouping_hash
144 145 )
145 146 occurences = report_data.get('occurences', 1)
146 147 if not report_group:
147 148 # total reports will be +1 moment later
148 149 report_group = ReportGroup(grouping_hash=report.grouping_hash,
149 150 occurences=0, total_reports=0,
150 151 last_report=0,
151 152 priority=report.priority,
152 153 error=report.error,
153 154 first_timestamp=report.start_time)
154 155 report_group._skip_ft_index = True
155 156 report_group.report_type = report.report_type
156 157 report.report_group_time = report_group.first_timestamp
157 158 add_sample = pick_sample(report_group.occurences,
158 159 report_type=report_group.report_type)
159 160 if add_sample:
160 161 resource.report_groups.append(report_group)
161 162 report_group.reports.append(report)
162 163 added_details += 1
163 164 DBSession.flush()
164 165 if report.partition_id not in es_report_docs:
165 166 es_report_docs[report.partition_id] = []
166 167 es_report_docs[report.partition_id].append(report.es_doc())
167 168 tags.extend(list(report.tags.items()))
168 169 slow_calls = report.add_slow_calls(report_data, report_group)
169 170 DBSession.flush()
170 171 for s_call in slow_calls:
171 172 if s_call.partition_id not in es_slow_calls_docs:
172 173 es_slow_calls_docs[s_call.partition_id] = []
173 174 es_slow_calls_docs[s_call.partition_id].append(
174 175 s_call.es_doc())
175 176 # try generating new stat rows if needed
176 177 else:
177 178 # required for postprocessing to not fail later
178 179 report.report_group = report_group
179 180
180 181 stat_row = ReportService.generate_stat_rows(
181 182 report, resource, report_group)
182 183 if stat_row.partition_id not in es_reports_stats_rows:
183 184 es_reports_stats_rows[stat_row.partition_id] = []
184 185 es_reports_stats_rows[stat_row.partition_id].append(
185 186 stat_row.es_doc())
186 187
187 188 # see if we should mark 10th occurence of report
188 189 last_occurences_10 = int(math.floor(report_group.occurences / 10))
189 190 curr_occurences_10 = int(math.floor(
190 191 (report_group.occurences + report.occurences) / 10))
191 192 last_occurences_100 = int(
192 193 math.floor(report_group.occurences / 100))
193 194 curr_occurences_100 = int(math.floor(
194 195 (report_group.occurences + report.occurences) / 100))
195 196 notify_occurences_10 = last_occurences_10 != curr_occurences_10
196 197 notify_occurences_100 = last_occurences_100 != curr_occurences_100
197 198 report_group.occurences = ReportGroup.occurences + occurences
198 199 report_group.last_timestamp = report.start_time
199 200 report_group.summed_duration = ReportGroup.summed_duration + report.duration
200 201 summed_duration = ReportGroup.summed_duration + report.duration
201 202 summed_occurences = ReportGroup.occurences + occurences
202 203 report_group.average_duration = summed_duration / summed_occurences
203 204 report_group.run_postprocessing(report)
204 205 if added_details:
205 206 report_group.total_reports = ReportGroup.total_reports + 1
206 207 report_group.last_report = report.id
207 208 report_group.set_notification_info(notify_10=notify_occurences_10,
208 209 notify_100=notify_occurences_100)
209 210 DBSession.flush()
210 211 report_group.get_report().notify_channel(report_group)
211 212 if report_group.partition_id not in es_report_group_docs:
212 213 es_report_group_docs[report_group.partition_id] = []
213 214 es_report_group_docs[report_group.partition_id].append(
214 215 report_group.es_doc())
215 216
216 217 action = 'REPORT'
217 218 log_msg = '%s: %s %s, client: %s, proto: %s' % (
218 219 action,
219 220 report_data.get('http_status', 'unknown'),
220 221 str(resource),
221 222 report_data.get('client'),
222 223 proto_version)
223 224 log.info(log_msg)
224 225 total_reports = len(dataset)
225 226 key = REDIS_KEYS['counters']['reports_per_minute'].format(current_time)
226 227 Datastores.redis.incr(key, total_reports)
227 228 Datastores.redis.expire(key, 3600 * 24)
228 229 key = REDIS_KEYS['counters']['reports_per_minute_per_app'].format(
229 230 resource_id, current_time)
230 231 Datastores.redis.incr(key, total_reports)
231 232 Datastores.redis.expire(key, 3600 * 24)
232 233
233 234 add_reports_es(es_report_group_docs, es_report_docs)
234 235 add_reports_slow_calls_es(es_slow_calls_docs)
235 236 add_reports_stats_rows_es(es_reports_stats_rows)
236 237 return True
237 238 except Exception as exc:
238 239 print_traceback(log)
239 240 add_reports.retry(exc=exc)
240 241
241 242
242 243 @celery.task(queue="es", default_retry_delay=600, max_retries=999)
243 244 def add_reports_es(report_group_docs, report_docs):
244 245 for k, v in report_group_docs.items():
245 246 Datastores.es.bulk_index(k, 'report_group', v, id_field="_id")
246 247 for k, v in report_docs.items():
247 248 Datastores.es.bulk_index(k, 'report', v, id_field="_id",
248 249 parent_field='_parent')
249 250
250 251
251 252 @celery.task(queue="es", default_retry_delay=600, max_retries=999)
252 253 def add_reports_slow_calls_es(es_docs):
253 254 for k, v in es_docs.items():
254 255 Datastores.es.bulk_index(k, 'log', v)
255 256
256 257
257 258 @celery.task(queue="es", default_retry_delay=600, max_retries=999)
258 259 def add_reports_stats_rows_es(es_docs):
259 260 for k, v in es_docs.items():
260 261 Datastores.es.bulk_index(k, 'log', v)
261 262
262 263
263 264 @celery.task(queue="logs", default_retry_delay=600, max_retries=999)
264 265 def add_logs(resource_id, request, dataset, environ=None, **kwargs):
265 266 proto_version = request.get('protocol_version')
266 267 current_time = datetime.utcnow().replace(second=0, microsecond=0)
267 268
268 269 try:
269 270 es_docs = collections.defaultdict(list)
270 271 application = ApplicationService.by_id(resource_id)
271 272 ns_pairs = []
272 273 for entry in dataset:
273 274 # gather pk and ns so we can remove older versions of row later
274 275 if entry['primary_key'] is not None:
275 276 ns_pairs.append({"pk": entry['primary_key'],
276 277 "ns": entry['namespace']})
277 278 log_entry = Log()
278 279 log_entry.set_data(entry, resource=application)
279 280 log_entry._skip_ft_index = True
280 281 application.logs.append(log_entry)
281 282 DBSession.flush()
282 283 # insert non pk rows first
283 284 if entry['primary_key'] is None:
284 285 es_docs[log_entry.partition_id].append(log_entry.es_doc())
285 286
286 287 # 2nd pass to delete all log entries from db foe same pk/ns pair
287 288 if ns_pairs:
288 289 ids_to_delete = []
289 290 es_docs = collections.defaultdict(list)
290 291 es_docs_to_delete = collections.defaultdict(list)
291 292 found_pkey_logs = LogService.query_by_primary_key_and_namespace(
292 293 list_of_pairs=ns_pairs)
293 294 log_dict = {}
294 295 for log_entry in found_pkey_logs:
295 296 log_key = (log_entry.primary_key, log_entry.namespace)
296 297 if log_key not in log_dict:
297 298 log_dict[log_key] = []
298 299 log_dict[log_key].append(log_entry)
299 300
300 301 for ns, entry_list in log_dict.items():
301 302 entry_list = sorted(entry_list, key=lambda x: x.timestamp)
302 303 # newest row needs to be indexed in es
303 304 log_entry = entry_list[-1]
304 305 # delete everything from pg and ES, leave the last row in pg
305 306 for e in entry_list[:-1]:
306 307 ids_to_delete.append(e.log_id)
307 308 es_docs_to_delete[e.partition_id].append(e.delete_hash)
308 309
309 310 es_docs_to_delete[log_entry.partition_id].append(
310 311 log_entry.delete_hash)
311 312
312 313 es_docs[log_entry.partition_id].append(log_entry.es_doc())
313 314
314 315 if ids_to_delete:
315 316 query = DBSession.query(Log).filter(
316 317 Log.log_id.in_(ids_to_delete))
317 318 query.delete(synchronize_session=False)
318 319 if es_docs_to_delete:
319 320 # batch this to avoid problems with default ES bulk limits
320 321 for es_index in es_docs_to_delete.keys():
321 322 for batch in in_batches(es_docs_to_delete[es_index], 20):
322 323 query = {'terms': {'delete_hash': batch}}
323 324
324 325 try:
325 326 Datastores.es.delete_by_query(
326 327 es_index, 'log', query)
327 328 except pyelasticsearch.ElasticHttpNotFoundError as exc:
328 329 log.error(exc)
329 330
330 331 total_logs = len(dataset)
331 332
332 333 log_msg = 'LOG_NEW: %s, entries: %s, proto:%s' % (
333 334 str(application),
334 335 total_logs,
335 336 proto_version)
336 337 log.info(log_msg)
337 338 # mark_changed(session)
338 339 key = REDIS_KEYS['counters']['logs_per_minute'].format(current_time)
339 340 Datastores.redis.incr(key, total_logs)
340 341 Datastores.redis.expire(key, 3600 * 24)
341 342 key = REDIS_KEYS['counters']['logs_per_minute_per_app'].format(
342 343 resource_id, current_time)
343 344 Datastores.redis.incr(key, total_logs)
344 345 Datastores.redis.expire(key, 3600 * 24)
345 346 add_logs_es(es_docs)
346 347 return True
347 348 except Exception as exc:
348 349 print_traceback(log)
349 350 add_logs.retry(exc=exc)
350 351
351 352
352 353 @celery.task(queue="es", default_retry_delay=600, max_retries=999)
353 354 def add_logs_es(es_docs):
354 355 for k, v in es_docs.items():
355 356 Datastores.es.bulk_index(k, 'log', v)
356 357
357 358
358 359 @celery.task(queue="metrics", default_retry_delay=600, max_retries=999)
359 360 def add_metrics(resource_id, request, dataset, proto_version):
360 361 current_time = datetime.utcnow().replace(second=0, microsecond=0)
361 362 try:
362 363 application = ApplicationService.by_id_cached()(resource_id)
363 364 application = DBSession.merge(application, load=False)
364 365 es_docs = []
365 366 rows = []
366 367 for metric in dataset:
367 368 tags = dict(metric['tags'])
368 369 server_n = tags.get('server_name', metric['server_name']).lower()
369 370 tags['server_name'] = server_n or 'unknown'
370 371 new_metric = Metric(
371 372 timestamp=metric['timestamp'],
372 373 resource_id=application.resource_id,
373 374 namespace=metric['namespace'],
374 375 tags=tags)
375 376 rows.append(new_metric)
376 377 es_docs.append(new_metric.es_doc())
377 378 session = DBSession()
378 379 session.bulk_save_objects(rows)
379 380 session.flush()
380 381
381 382 action = 'METRICS'
382 383 metrics_msg = '%s: %s, metrics: %s, proto:%s' % (
383 384 action,
384 385 str(application),
385 386 len(dataset),
386 387 proto_version
387 388 )
388 389 log.info(metrics_msg)
389 390
390 391 mark_changed(session)
391 392 key = REDIS_KEYS['counters']['metrics_per_minute'].format(current_time)
392 393 Datastores.redis.incr(key, len(rows))
393 394 Datastores.redis.expire(key, 3600 * 24)
394 395 key = REDIS_KEYS['counters']['metrics_per_minute_per_app'].format(
395 396 resource_id, current_time)
396 397 Datastores.redis.incr(key, len(rows))
397 398 Datastores.redis.expire(key, 3600 * 24)
398 399 add_metrics_es(es_docs)
399 400 return True
400 401 except Exception as exc:
401 402 print_traceback(log)
402 403 add_metrics.retry(exc=exc)
403 404
404 405
405 406 @celery.task(queue="es", default_retry_delay=600, max_retries=999)
406 407 def add_metrics_es(es_docs):
407 408 for doc in es_docs:
408 409 partition = 'rcae_m_%s' % doc['timestamp'].strftime('%Y_%m_%d')
409 410 Datastores.es.index(partition, 'log', doc)
410 411
411 412
412 413 @celery.task(queue="default", default_retry_delay=5, max_retries=2)
413 414 def check_user_report_notifications(resource_id):
414 415 since_when = datetime.utcnow()
415 416 try:
416 417 request = get_current_request()
417 418 application = ApplicationService.by_id(resource_id)
418 419 if not application:
419 420 return
420 421 error_key = REDIS_KEYS['reports_to_notify_per_type_per_app'].format(
421 422 ReportType.error, resource_id)
422 423 slow_key = REDIS_KEYS['reports_to_notify_per_type_per_app'].format(
423 424 ReportType.slow, resource_id)
424 425 error_group_ids = Datastores.redis.smembers(error_key)
425 426 slow_group_ids = Datastores.redis.smembers(slow_key)
426 427 Datastores.redis.delete(error_key)
427 428 Datastores.redis.delete(slow_key)
428 429 err_gids = [int(g_id) for g_id in error_group_ids]
429 430 slow_gids = [int(g_id) for g_id in list(slow_group_ids)]
430 431 group_ids = err_gids + slow_gids
431 432 occurence_dict = {}
432 433 for g_id in group_ids:
433 434 key = REDIS_KEYS['counters']['report_group_occurences'].format(
434 435 g_id)
435 436 val = Datastores.redis.get(key)
436 437 Datastores.redis.delete(key)
437 438 if val:
438 439 occurence_dict[g_id] = int(val)
439 440 else:
440 441 occurence_dict[g_id] = 1
441 442 report_groups = ReportGroupService.by_ids(group_ids)
442 443 report_groups.options(sa.orm.joinedload(ReportGroup.last_report_ref))
443 444
444 445 ApplicationService.check_for_groups_alert(
445 446 application, 'alert', report_groups=report_groups,
446 447 occurence_dict=occurence_dict)
447 448 users = set([p.user for p in application.users_for_perm('view')])
448 449 report_groups = report_groups.all()
449 450 for user in users:
450 451 UserService.report_notify(user, request, application,
451 452 report_groups=report_groups,
452 453 occurence_dict=occurence_dict)
453 454 for group in report_groups:
454 455 # marks report_groups as notified
455 456 if not group.notified:
456 457 group.notified = True
457 458 except Exception as exc:
458 459 print_traceback(log)
459 460 raise
460 461
461 462
462 463 @celery.task(queue="default", default_retry_delay=5, max_retries=2)
463 464 def check_alerts(resource_id):
464 465 since_when = datetime.utcnow()
465 466 try:
466 467 request = get_current_request()
467 468 application = ApplicationService.by_id(resource_id)
468 469 if not application:
469 470 return
470 471 error_key = REDIS_KEYS[
471 472 'reports_to_notify_per_type_per_app_alerting'].format(
472 473 ReportType.error, resource_id)
473 474 slow_key = REDIS_KEYS[
474 475 'reports_to_notify_per_type_per_app_alerting'].format(
475 476 ReportType.slow, resource_id)
476 477 error_group_ids = Datastores.redis.smembers(error_key)
477 478 slow_group_ids = Datastores.redis.smembers(slow_key)
478 479 Datastores.redis.delete(error_key)
479 480 Datastores.redis.delete(slow_key)
480 481 err_gids = [int(g_id) for g_id in error_group_ids]
481 482 slow_gids = [int(g_id) for g_id in list(slow_group_ids)]
482 483 group_ids = err_gids + slow_gids
483 484 occurence_dict = {}
484 485 for g_id in group_ids:
485 486 key = REDIS_KEYS['counters'][
486 487 'report_group_occurences_alerting'].format(
487 488 g_id)
488 489 val = Datastores.redis.get(key)
489 490 Datastores.redis.delete(key)
490 491 if val:
491 492 occurence_dict[g_id] = int(val)
492 493 else:
493 494 occurence_dict[g_id] = 1
494 495 report_groups = ReportGroupService.by_ids(group_ids)
495 496 report_groups.options(sa.orm.joinedload(ReportGroup.last_report_ref))
496 497
497 498 ApplicationService.check_for_groups_alert(
498 499 application, 'alert', report_groups=report_groups,
499 500 occurence_dict=occurence_dict, since_when=since_when)
500 501 except Exception as exc:
501 502 print_traceback(log)
502 503 raise
503 504
504 505
505 506 @celery.task(queue="default", default_retry_delay=1, max_retries=2)
506 507 def close_alerts():
507 508 log.warning('Checking alerts')
508 509 since_when = datetime.utcnow()
509 510 try:
510 511 event_types = [Event.types['error_report_alert'],
511 512 Event.types['slow_report_alert'], ]
512 513 statuses = [Event.statuses['active']]
513 514 # get events older than 5 min
514 515 events = EventService.by_type_and_status(
515 516 event_types,
516 517 statuses,
517 518 older_than=(since_when - timedelta(minutes=5)))
518 519 for event in events:
519 520 # see if we can close them
520 521 event.validate_or_close(
521 522 since_when=(since_when - timedelta(minutes=1)))
522 523 except Exception as exc:
523 524 print_traceback(log)
524 525 raise
525 526
526 527
527 528 @celery.task(queue="default", default_retry_delay=600, max_retries=999)
528 529 def update_tag_counter(tag_name, tag_value, count):
529 530 try:
530 531 query = DBSession.query(Tag).filter(Tag.name == tag_name).filter(
531 532 sa.cast(Tag.value, sa.types.TEXT) == sa.cast(json.dumps(tag_value),
532 533 sa.types.TEXT))
533 534 query.update({'times_seen': Tag.times_seen + count,
534 535 'last_timestamp': datetime.utcnow()},
535 536 synchronize_session=False)
536 537 session = DBSession()
537 538 mark_changed(session)
538 539 return True
539 540 except Exception as exc:
540 541 print_traceback(log)
541 542 update_tag_counter.retry(exc=exc)
542 543
543 544
544 545 @celery.task(queue="default")
545 546 def update_tag_counters():
546 547 """
547 548 Sets task to update counters for application tags
548 549 """
549 550 tags = Datastores.redis.lrange(REDIS_KEYS['seen_tag_list'], 0, -1)
550 551 Datastores.redis.delete(REDIS_KEYS['seen_tag_list'])
551 552 c = collections.Counter(tags)
552 553 for t_json, count in c.items():
553 554 tag_info = json.loads(t_json)
554 555 update_tag_counter.delay(tag_info[0], tag_info[1], count)
555 556
556 557
557 558 @celery.task(queue="default")
558 559 def daily_digest():
559 560 """
560 561 Sends daily digest with top 50 error reports
561 562 """
562 563 request = get_current_request()
563 564 apps = Datastores.redis.smembers(REDIS_KEYS['apps_that_had_reports'])
564 565 Datastores.redis.delete(REDIS_KEYS['apps_that_had_reports'])
565 566 since_when = datetime.utcnow() - timedelta(hours=8)
566 567 log.warning('Generating daily digests')
567 568 for resource_id in apps:
568 569 resource_id = resource_id.decode('utf8')
569 570 end_date = datetime.utcnow().replace(microsecond=0, second=0)
570 571 filter_settings = {'resource': [resource_id],
571 572 'tags': [{'name': 'type',
572 573 'value': ['error'], 'op': None}],
573 574 'type': 'error', 'start_date': since_when,
574 575 'end_date': end_date}
575 576
576 577 reports = ReportGroupService.get_trending(
577 578 request, filter_settings=filter_settings, limit=50)
578 579
579 580 application = ApplicationService.by_id(resource_id)
580 581 if application:
581 582 users = set([p.user for p in application.users_for_perm('view')])
582 583 for user in users:
583 584 user.send_digest(request, application, reports=reports,
584 585 since_when=since_when)
585 586
586 587
587 588 @celery.task(queue="default")
588 589 def notifications_reports():
589 590 """
590 591 Loop that checks redis for info and then issues new tasks to celery to
591 592 issue notifications
592 593 """
593 594 apps = Datastores.redis.smembers(REDIS_KEYS['apps_that_had_reports'])
594 595 Datastores.redis.delete(REDIS_KEYS['apps_that_had_reports'])
595 596 for app in apps:
596 597 log.warning('Notify for app: %s' % app)
597 598 check_user_report_notifications.delay(app.decode('utf8'))
598 599
599 600 @celery.task(queue="default")
600 601 def alerting_reports():
601 602 """
602 603 Loop that checks redis for info and then issues new tasks to celery to
603 604 perform the following:
604 605 - which applications should have new alerts opened
605 606 """
606 607
607 608 apps = Datastores.redis.smembers(REDIS_KEYS['apps_that_had_reports_alerting'])
608 609 Datastores.redis.delete(REDIS_KEYS['apps_that_had_reports_alerting'])
609 610 for app in apps:
610 611 log.warning('Notify for app: %s' % app)
611 612 check_alerts.delay(app.decode('utf8'))
612 613
613 614
614 615 @celery.task(queue="default", soft_time_limit=3600 * 4,
615 616 hard_time_limit=3600 * 4, max_retries=999)
616 617 def logs_cleanup(resource_id, filter_settings):
617 618 request = get_current_request()
618 619 request.tm.begin()
619 620 es_query = {
620 621 "_source": False,
621 622 "size": 5000,
622 623 "query": {
623 624 "filtered": {
624 625 "filter": {
625 626 "and": [{"term": {"resource_id": resource_id}}]
626 627 }
627 628 }
628 629 }
629 630 }
630 631
631 632 query = DBSession.query(Log).filter(Log.resource_id == resource_id)
632 633 if filter_settings['namespace']:
633 634 query = query.filter(Log.namespace == filter_settings['namespace'][0])
634 635 es_query['query']['filtered']['filter']['and'].append(
635 636 {"term": {"namespace": filter_settings['namespace'][0]}}
636 637 )
637 638 query.delete(synchronize_session=False)
638 639 request.tm.commit()
639 640 result = request.es_conn.search(es_query, index='rcae_l_*',
640 641 doc_type='log', es_scroll='1m',
641 642 es_search_type='scan')
642 643 scroll_id = result['_scroll_id']
643 644 while True:
644 645 log.warning('log_cleanup, app:{} ns:{} batch'.format(
645 646 resource_id,
646 647 filter_settings['namespace']
647 648 ))
648 649 es_docs_to_delete = []
649 650 result = request.es_conn.send_request(
650 651 'POST', ['_search', 'scroll'],
651 652 body=scroll_id, query_params={"scroll": '1m'})
652 653 scroll_id = result['_scroll_id']
653 654 if not result['hits']['hits']:
654 655 break
655 656 for doc in result['hits']['hits']:
656 657 es_docs_to_delete.append({"id": doc['_id'],
657 658 "index": doc['_index']})
658 659
659 660 for batch in in_batches(es_docs_to_delete, 10):
660 661 Datastores.es.bulk([Datastores.es.delete_op(doc_type='log',
661 662 **to_del)
662 663 for to_del in batch])
General Comments 0
You need to be logged in to leave comments. Login now