##// END OF EJS Templates
logs: better log message
ergo -
Show More
@@ -1,634 +1,635 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2016 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # AppEnlight Enterprise Edition, including its added features, Support
19 19 # services, and proprietary license terms, please see
20 20 # https://rhodecode.com/licenses/
21 21
22 22 import bisect
23 23 import collections
24 24 import math
25 25 from datetime import datetime, timedelta
26 26
27 27 import sqlalchemy as sa
28 28 import pyelasticsearch
29 29
30 30 from celery.utils.log import get_task_logger
31 31 from zope.sqlalchemy import mark_changed
32 32 from pyramid.threadlocal import get_current_request, get_current_registry
33 33 from appenlight.celery import celery
34 34 from appenlight.models.report_group import ReportGroup
35 35 from appenlight.models import DBSession, Datastores
36 36 from appenlight.models.report import Report
37 37 from appenlight.models.log import Log
38 38 from appenlight.models.metric import Metric
39 39 from appenlight.models.event import Event
40 40
41 41 from appenlight.models.services.application import ApplicationService
42 42 from appenlight.models.services.event import EventService
43 43 from appenlight.models.services.log import LogService
44 44 from appenlight.models.services.report import ReportService
45 45 from appenlight.models.services.report_group import ReportGroupService
46 46 from appenlight.models.services.user import UserService
47 47 from appenlight.models.tag import Tag
48 48 from appenlight.lib import print_traceback
49 49 from appenlight.lib.utils import parse_proto, in_batches
50 50 from appenlight.lib.ext_json import json
51 51 from appenlight.lib.redis_keys import REDIS_KEYS
52 52 from appenlight.lib.enums import ReportType
53 53
54 54 log = get_task_logger(__name__)
55 55
56 56 sample_boundries = list(range(100, 1000, 100)) + \
57 57 list(range(1000, 10000, 1000)) + \
58 58 list(range(10000, 100000, 5000))
59 59
60 60
61 61 def pick_sample(total_occurences, report_type=None):
62 62 every = 1.0
63 63 position = bisect.bisect_left(sample_boundries, total_occurences)
64 64 if position > 0:
65 65 if report_type == ReportType.not_found:
66 66 divide = 10.0
67 67 else:
68 68 divide = 100.0
69 69 every = sample_boundries[position - 1] / divide
70 70 return total_occurences % every == 0
71 71
72 72
73 73 @celery.task(queue="default", default_retry_delay=1, max_retries=2)
74 74 def test_exception_task():
75 75 log.error('test celery log', extra={'location': 'celery'})
76 76 log.warning('test celery log', extra={'location': 'celery'})
77 77 raise Exception('Celery exception test')
78 78
79 79
80 80 @celery.task(queue="default", default_retry_delay=1, max_retries=2)
81 81 def test_retry_exception_task():
82 82 try:
83 83 import time
84 84
85 85 time.sleep(1.3)
86 86 log.error('test retry celery log', extra={'location': 'celery'})
87 87 log.warning('test retry celery log', extra={'location': 'celery'})
88 88 raise Exception('Celery exception test')
89 89 except Exception as exc:
90 90 test_retry_exception_task.retry(exc=exc)
91 91
92 92
93 93 @celery.task(queue="reports", default_retry_delay=600, max_retries=144)
94 94 def add_reports(resource_id, params, dataset, environ=None, **kwargs):
95 95 proto_version = parse_proto(params.get('protocol_version', ''))
96 96 current_time = datetime.utcnow().replace(second=0, microsecond=0)
97 97 try:
98 98 # we will store solr docs here for single insert
99 99 es_report_docs = {}
100 100 es_report_group_docs = {}
101 101 resource = ApplicationService.by_id(resource_id)
102 102
103 103 tags = []
104 104 es_slow_calls_docs = {}
105 105 es_reports_stats_rows = {}
106 106 for report_data in dataset:
107 107 # build report details for later
108 108 added_details = 0
109 109 report = Report()
110 110 report.set_data(report_data, resource, proto_version)
111 111 report._skip_ft_index = True
112 112
113 113 report_group = ReportGroupService.by_hash_and_resource(
114 114 report.resource_id,
115 115 report.grouping_hash
116 116 )
117 117 occurences = report_data.get('occurences', 1)
118 118 if not report_group:
119 119 # total reports will be +1 moment later
120 120 report_group = ReportGroup(grouping_hash=report.grouping_hash,
121 121 occurences=0, total_reports=0,
122 122 last_report=0,
123 123 priority=report.priority,
124 124 error=report.error,
125 125 first_timestamp=report.start_time)
126 126 report_group._skip_ft_index = True
127 127 report_group.report_type = report.report_type
128 128 report.report_group_time = report_group.first_timestamp
129 129 add_sample = pick_sample(report_group.occurences,
130 130 report_type=report_group.report_type)
131 131 if add_sample:
132 132 resource.report_groups.append(report_group)
133 133 report_group.reports.append(report)
134 134 added_details += 1
135 135 DBSession.flush()
136 136 if report.partition_id not in es_report_docs:
137 137 es_report_docs[report.partition_id] = []
138 138 es_report_docs[report.partition_id].append(report.es_doc())
139 139 tags.extend(list(report.tags.items()))
140 140 slow_calls = report.add_slow_calls(report_data, report_group)
141 141 DBSession.flush()
142 142 for s_call in slow_calls:
143 143 if s_call.partition_id not in es_slow_calls_docs:
144 144 es_slow_calls_docs[s_call.partition_id] = []
145 145 es_slow_calls_docs[s_call.partition_id].append(
146 146 s_call.es_doc())
147 147 # try generating new stat rows if needed
148 148 else:
149 149 # required for postprocessing to not fail later
150 150 report.report_group = report_group
151 151
152 152 stat_row = ReportService.generate_stat_rows(
153 153 report, resource, report_group)
154 154 if stat_row.partition_id not in es_reports_stats_rows:
155 155 es_reports_stats_rows[stat_row.partition_id] = []
156 156 es_reports_stats_rows[stat_row.partition_id].append(
157 157 stat_row.es_doc())
158 158
159 159 # see if we should mark 10th occurence of report
160 160 last_occurences_10 = int(math.floor(report_group.occurences / 10))
161 161 curr_occurences_10 = int(math.floor(
162 162 (report_group.occurences + report.occurences) / 10))
163 163 last_occurences_100 = int(
164 164 math.floor(report_group.occurences / 100))
165 165 curr_occurences_100 = int(math.floor(
166 166 (report_group.occurences + report.occurences) / 100))
167 167 notify_occurences_10 = last_occurences_10 != curr_occurences_10
168 168 notify_occurences_100 = last_occurences_100 != curr_occurences_100
169 169 report_group.occurences = ReportGroup.occurences + occurences
170 170 report_group.last_timestamp = report.start_time
171 171 report_group.summed_duration = ReportGroup.summed_duration + report.duration
172 172 summed_duration = ReportGroup.summed_duration + report.duration
173 173 summed_occurences = ReportGroup.occurences + occurences
174 174 report_group.average_duration = summed_duration / summed_occurences
175 175 report_group.run_postprocessing(report)
176 176 if added_details:
177 177 report_group.total_reports = ReportGroup.total_reports + 1
178 178 report_group.last_report = report.id
179 179 report_group.set_notification_info(notify_10=notify_occurences_10,
180 180 notify_100=notify_occurences_100)
181 181 DBSession.flush()
182 182 report_group.get_report().notify_channel(report_group)
183 183 if report_group.partition_id not in es_report_group_docs:
184 184 es_report_group_docs[report_group.partition_id] = []
185 185 es_report_group_docs[report_group.partition_id].append(
186 186 report_group.es_doc())
187 187
188 188 action = 'REPORT'
189 189 log_msg = '%s: %s %s, client: %s, proto: %s' % (
190 190 action,
191 191 report_data.get('http_status', 'unknown'),
192 192 str(resource),
193 193 report_data.get('client'),
194 194 proto_version)
195 195 log.info(log_msg)
196 196 total_reports = len(dataset)
197 197 key = REDIS_KEYS['counters']['reports_per_minute'].format(current_time)
198 198 Datastores.redis.incr(key, total_reports)
199 199 Datastores.redis.expire(key, 3600 * 24)
200 200 key = REDIS_KEYS['counters']['reports_per_minute_per_app'].format(
201 201 resource_id, current_time)
202 202 Datastores.redis.incr(key, total_reports)
203 203 Datastores.redis.expire(key, 3600 * 24)
204 204
205 205 add_reports_es(es_report_group_docs, es_report_docs)
206 206 add_reports_slow_calls_es(es_slow_calls_docs)
207 207 add_reports_stats_rows_es(es_reports_stats_rows)
208 208 return True
209 209 except Exception as exc:
210 210 print_traceback(log)
211 211 add_reports.retry(exc=exc)
212 212
213 213
214 214 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
215 215 def add_reports_es(report_group_docs, report_docs):
216 216 for k, v in report_group_docs.items():
217 217 Datastores.es.bulk_index(k, 'report_group', v, id_field="_id")
218 218 for k, v in report_docs.items():
219 219 Datastores.es.bulk_index(k, 'report', v, id_field="_id",
220 220 parent_field='_parent')
221 221
222 222
223 223 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
224 224 def add_reports_slow_calls_es(es_docs):
225 225 for k, v in es_docs.items():
226 226 Datastores.es.bulk_index(k, 'log', v)
227 227
228 228
229 229 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
230 230 def add_reports_stats_rows_es(es_docs):
231 231 for k, v in es_docs.items():
232 232 Datastores.es.bulk_index(k, 'log', v)
233 233
234 234
235 235 @celery.task(queue="logs", default_retry_delay=600, max_retries=144)
236 236 def add_logs(resource_id, request, dataset, environ=None, **kwargs):
237 237 proto_version = request.get('protocol_version')
238 238 current_time = datetime.utcnow().replace(second=0, microsecond=0)
239 239
240 240 try:
241 241 es_docs = collections.defaultdict(list)
242 242 application = ApplicationService.by_id(resource_id)
243 243 ns_pairs = []
244 244 for entry in dataset:
245 245 # gather pk and ns so we can remove older versions of row later
246 246 if entry['primary_key'] is not None:
247 247 ns_pairs.append({"pk": entry['primary_key'],
248 248 "ns": entry['namespace']})
249 249 log_entry = Log()
250 250 log_entry.set_data(entry, resource=application)
251 251 log_entry._skip_ft_index = True
252 252 application.logs.append(log_entry)
253 253 DBSession.flush()
254 254 # insert non pk rows first
255 255 if entry['primary_key'] is None:
256 256 es_docs[log_entry.partition_id].append(log_entry.es_doc())
257 257
258 258 # 2nd pass to delete all log entries from db foe same pk/ns pair
259 259 if ns_pairs:
260 260 ids_to_delete = []
261 261 es_docs = collections.defaultdict(list)
262 262 es_docs_to_delete = collections.defaultdict(list)
263 263 found_pkey_logs = LogService.query_by_primary_key_and_namespace(
264 264 list_of_pairs=ns_pairs)
265 265 log_dict = {}
266 266 for log_entry in found_pkey_logs:
267 267 log_key = (log_entry.primary_key, log_entry.namespace)
268 268 if log_key not in log_dict:
269 269 log_dict[log_key] = []
270 270 log_dict[log_key].append(log_entry)
271 271
272 272 for ns, entry_list in log_dict.items():
273 273 entry_list = sorted(entry_list, key=lambda x: x.timestamp)
274 274 # newest row needs to be indexed in es
275 275 log_entry = entry_list[-1]
276 276 # delete everything from pg and ES, leave the last row in pg
277 277 for e in entry_list[:-1]:
278 278 ids_to_delete.append(e.log_id)
279 279 es_docs_to_delete[e.partition_id].append(e.delete_hash)
280 280
281 281 es_docs_to_delete[log_entry.partition_id].append(
282 282 log_entry.delete_hash)
283 283
284 284 es_docs[log_entry.partition_id].append(log_entry.es_doc())
285 285
286 286 if ids_to_delete:
287 287 query = DBSession.query(Log).filter(
288 288 Log.log_id.in_(ids_to_delete))
289 289 query.delete(synchronize_session=False)
290 290 if es_docs_to_delete:
291 291 # batch this to avoid problems with default ES bulk limits
292 292 for es_index in es_docs_to_delete.keys():
293 293 for batch in in_batches(es_docs_to_delete[es_index], 20):
294 294 query = {'terms': {'delete_hash': batch}}
295 295
296 296 try:
297 297 Datastores.es.delete_by_query(
298 298 es_index, 'log', query)
299 299 except pyelasticsearch.ElasticHttpNotFoundError as exc:
300 log.error(exc)
300 msg = 'skipping index {}'.format(es_index)
301 log.info(msg)
301 302
302 303 total_logs = len(dataset)
303 304
304 305 log_msg = 'LOG_NEW: %s, entries: %s, proto:%s' % (
305 306 str(application),
306 307 total_logs,
307 308 proto_version)
308 309 log.info(log_msg)
309 310 # mark_changed(session)
310 311 key = REDIS_KEYS['counters']['logs_per_minute'].format(current_time)
311 312 Datastores.redis.incr(key, total_logs)
312 313 Datastores.redis.expire(key, 3600 * 24)
313 314 key = REDIS_KEYS['counters']['logs_per_minute_per_app'].format(
314 315 resource_id, current_time)
315 316 Datastores.redis.incr(key, total_logs)
316 317 Datastores.redis.expire(key, 3600 * 24)
317 318 add_logs_es(es_docs)
318 319 return True
319 320 except Exception as exc:
320 321 print_traceback(log)
321 322 add_logs.retry(exc=exc)
322 323
323 324
324 325 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
325 326 def add_logs_es(es_docs):
326 327 for k, v in es_docs.items():
327 328 Datastores.es.bulk_index(k, 'log', v)
328 329
329 330
330 331 @celery.task(queue="metrics", default_retry_delay=600, max_retries=144)
331 332 def add_metrics(resource_id, request, dataset, proto_version):
332 333 current_time = datetime.utcnow().replace(second=0, microsecond=0)
333 334 try:
334 335 application = ApplicationService.by_id_cached()(resource_id)
335 336 application = DBSession.merge(application, load=False)
336 337 es_docs = []
337 338 rows = []
338 339 for metric in dataset:
339 340 tags = dict(metric['tags'])
340 341 server_n = tags.get('server_name', metric['server_name']).lower()
341 342 tags['server_name'] = server_n or 'unknown'
342 343 new_metric = Metric(
343 344 timestamp=metric['timestamp'],
344 345 resource_id=application.resource_id,
345 346 namespace=metric['namespace'],
346 347 tags=tags)
347 348 rows.append(new_metric)
348 349 es_docs.append(new_metric.es_doc())
349 350 session = DBSession()
350 351 session.bulk_save_objects(rows)
351 352 session.flush()
352 353
353 354 action = 'METRICS'
354 355 metrics_msg = '%s: %s, metrics: %s, proto:%s' % (
355 356 action,
356 357 str(application),
357 358 len(dataset),
358 359 proto_version
359 360 )
360 361 log.info(metrics_msg)
361 362
362 363 mark_changed(session)
363 364 key = REDIS_KEYS['counters']['metrics_per_minute'].format(current_time)
364 365 Datastores.redis.incr(key, len(rows))
365 366 Datastores.redis.expire(key, 3600 * 24)
366 367 key = REDIS_KEYS['counters']['metrics_per_minute_per_app'].format(
367 368 resource_id, current_time)
368 369 Datastores.redis.incr(key, len(rows))
369 370 Datastores.redis.expire(key, 3600 * 24)
370 371 add_metrics_es(es_docs)
371 372 return True
372 373 except Exception as exc:
373 374 print_traceback(log)
374 375 add_metrics.retry(exc=exc)
375 376
376 377
377 378 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
378 379 def add_metrics_es(es_docs):
379 380 for doc in es_docs:
380 381 partition = 'rcae_m_%s' % doc['timestamp'].strftime('%Y_%m_%d')
381 382 Datastores.es.index(partition, 'log', doc)
382 383
383 384
384 385 @celery.task(queue="default", default_retry_delay=5, max_retries=2)
385 386 def check_user_report_notifications(resource_id):
386 387 since_when = datetime.utcnow()
387 388 try:
388 389 request = get_current_request()
389 390 application = ApplicationService.by_id(resource_id)
390 391 if not application:
391 392 return
392 393 error_key = REDIS_KEYS['reports_to_notify_per_type_per_app'].format(
393 394 ReportType.error, resource_id)
394 395 slow_key = REDIS_KEYS['reports_to_notify_per_type_per_app'].format(
395 396 ReportType.slow, resource_id)
396 397 error_group_ids = Datastores.redis.smembers(error_key)
397 398 slow_group_ids = Datastores.redis.smembers(slow_key)
398 399 Datastores.redis.delete(error_key)
399 400 Datastores.redis.delete(slow_key)
400 401 err_gids = [int(g_id) for g_id in error_group_ids]
401 402 slow_gids = [int(g_id) for g_id in list(slow_group_ids)]
402 403 group_ids = err_gids + slow_gids
403 404 occurence_dict = {}
404 405 for g_id in group_ids:
405 406 key = REDIS_KEYS['counters']['report_group_occurences'].format(
406 407 g_id)
407 408 val = Datastores.redis.get(key)
408 409 Datastores.redis.delete(key)
409 410 if val:
410 411 occurence_dict[g_id] = int(val)
411 412 else:
412 413 occurence_dict[g_id] = 1
413 414 report_groups = ReportGroupService.by_ids(group_ids)
414 415 report_groups.options(sa.orm.joinedload(ReportGroup.last_report_ref))
415 416
416 417 ApplicationService.check_for_groups_alert(
417 418 application, 'alert', report_groups=report_groups,
418 419 occurence_dict=occurence_dict)
419 420 users = set([p.user for p in application.users_for_perm('view')])
420 421 report_groups = report_groups.all()
421 422 for user in users:
422 423 UserService.report_notify(user, request, application,
423 424 report_groups=report_groups,
424 425 occurence_dict=occurence_dict)
425 426 for group in report_groups:
426 427 # marks report_groups as notified
427 428 if not group.notified:
428 429 group.notified = True
429 430 except Exception as exc:
430 431 print_traceback(log)
431 432 raise
432 433
433 434
434 435 @celery.task(queue="default", default_retry_delay=5, max_retries=2)
435 436 def check_alerts(resource_id):
436 437 since_when = datetime.utcnow()
437 438 try:
438 439 request = get_current_request()
439 440 application = ApplicationService.by_id(resource_id)
440 441 if not application:
441 442 return
442 443 error_key = REDIS_KEYS[
443 444 'reports_to_notify_per_type_per_app_alerting'].format(
444 445 ReportType.error, resource_id)
445 446 slow_key = REDIS_KEYS[
446 447 'reports_to_notify_per_type_per_app_alerting'].format(
447 448 ReportType.slow, resource_id)
448 449 error_group_ids = Datastores.redis.smembers(error_key)
449 450 slow_group_ids = Datastores.redis.smembers(slow_key)
450 451 Datastores.redis.delete(error_key)
451 452 Datastores.redis.delete(slow_key)
452 453 err_gids = [int(g_id) for g_id in error_group_ids]
453 454 slow_gids = [int(g_id) for g_id in list(slow_group_ids)]
454 455 group_ids = err_gids + slow_gids
455 456 occurence_dict = {}
456 457 for g_id in group_ids:
457 458 key = REDIS_KEYS['counters'][
458 459 'report_group_occurences_alerting'].format(
459 460 g_id)
460 461 val = Datastores.redis.get(key)
461 462 Datastores.redis.delete(key)
462 463 if val:
463 464 occurence_dict[g_id] = int(val)
464 465 else:
465 466 occurence_dict[g_id] = 1
466 467 report_groups = ReportGroupService.by_ids(group_ids)
467 468 report_groups.options(sa.orm.joinedload(ReportGroup.last_report_ref))
468 469
469 470 ApplicationService.check_for_groups_alert(
470 471 application, 'alert', report_groups=report_groups,
471 472 occurence_dict=occurence_dict, since_when=since_when)
472 473 except Exception as exc:
473 474 print_traceback(log)
474 475 raise
475 476
476 477
477 478 @celery.task(queue="default", default_retry_delay=1, max_retries=2)
478 479 def close_alerts():
479 480 log.warning('Checking alerts')
480 481 since_when = datetime.utcnow()
481 482 try:
482 483 event_types = [Event.types['error_report_alert'],
483 484 Event.types['slow_report_alert'], ]
484 485 statuses = [Event.statuses['active']]
485 486 # get events older than 5 min
486 487 events = EventService.by_type_and_status(
487 488 event_types,
488 489 statuses,
489 490 older_than=(since_when - timedelta(minutes=5)))
490 491 for event in events:
491 492 # see if we can close them
492 493 event.validate_or_close(
493 494 since_when=(since_when - timedelta(minutes=1)))
494 495 except Exception as exc:
495 496 print_traceback(log)
496 497 raise
497 498
498 499
499 500 @celery.task(queue="default", default_retry_delay=600, max_retries=144)
500 501 def update_tag_counter(tag_name, tag_value, count):
501 502 try:
502 503 query = DBSession.query(Tag).filter(Tag.name == tag_name).filter(
503 504 sa.cast(Tag.value, sa.types.TEXT) == sa.cast(json.dumps(tag_value),
504 505 sa.types.TEXT))
505 506 query.update({'times_seen': Tag.times_seen + count,
506 507 'last_timestamp': datetime.utcnow()},
507 508 synchronize_session=False)
508 509 session = DBSession()
509 510 mark_changed(session)
510 511 return True
511 512 except Exception as exc:
512 513 print_traceback(log)
513 514 update_tag_counter.retry(exc=exc)
514 515
515 516
516 517 @celery.task(queue="default")
517 518 def update_tag_counters():
518 519 """
519 520 Sets task to update counters for application tags
520 521 """
521 522 tags = Datastores.redis.lrange(REDIS_KEYS['seen_tag_list'], 0, -1)
522 523 Datastores.redis.delete(REDIS_KEYS['seen_tag_list'])
523 524 c = collections.Counter(tags)
524 525 for t_json, count in c.items():
525 526 tag_info = json.loads(t_json)
526 527 update_tag_counter.delay(tag_info[0], tag_info[1], count)
527 528
528 529
529 530 @celery.task(queue="default")
530 531 def daily_digest():
531 532 """
532 533 Sends daily digest with top 50 error reports
533 534 """
534 535 request = get_current_request()
535 536 apps = Datastores.redis.smembers(REDIS_KEYS['apps_that_had_reports'])
536 537 Datastores.redis.delete(REDIS_KEYS['apps_that_had_reports'])
537 538 since_when = datetime.utcnow() - timedelta(hours=8)
538 539 log.warning('Generating daily digests')
539 540 for resource_id in apps:
540 541 resource_id = resource_id.decode('utf8')
541 542 end_date = datetime.utcnow().replace(microsecond=0, second=0)
542 543 filter_settings = {'resource': [resource_id],
543 544 'tags': [{'name': 'type',
544 545 'value': ['error'], 'op': None}],
545 546 'type': 'error', 'start_date': since_when,
546 547 'end_date': end_date}
547 548
548 549 reports = ReportGroupService.get_trending(
549 550 request, filter_settings=filter_settings, limit=50)
550 551
551 552 application = ApplicationService.by_id(resource_id)
552 553 if application:
553 554 users = set([p.user for p in application.users_for_perm('view')])
554 555 for user in users:
555 556 user.send_digest(request, application, reports=reports,
556 557 since_when=since_when)
557 558
558 559
559 560 @celery.task(queue="default")
560 561 def notifications_reports():
561 562 """
562 563 Loop that checks redis for info and then issues new tasks to celery to
563 564 issue notifications
564 565 """
565 566 apps = Datastores.redis.smembers(REDIS_KEYS['apps_that_had_reports'])
566 567 Datastores.redis.delete(REDIS_KEYS['apps_that_had_reports'])
567 568 for app in apps:
568 569 log.warning('Notify for app: %s' % app)
569 570 check_user_report_notifications.delay(app.decode('utf8'))
570 571
571 572 @celery.task(queue="default")
572 573 def alerting_reports():
573 574 """
574 575 Loop that checks redis for info and then issues new tasks to celery to
575 576 perform the following:
576 577 - which applications should have new alerts opened
577 578 """
578 579
579 580 apps = Datastores.redis.smembers(REDIS_KEYS['apps_that_had_reports_alerting'])
580 581 Datastores.redis.delete(REDIS_KEYS['apps_that_had_reports_alerting'])
581 582 for app in apps:
582 583 log.warning('Notify for app: %s' % app)
583 584 check_alerts.delay(app.decode('utf8'))
584 585
585 586
586 587 @celery.task(queue="default", soft_time_limit=3600 * 4,
587 588 hard_time_limit=3600 * 4, max_retries=144)
588 589 def logs_cleanup(resource_id, filter_settings):
589 590 request = get_current_request()
590 591 request.tm.begin()
591 592 es_query = {
592 593 "_source": False,
593 594 "size": 5000,
594 595 "query": {
595 596 "filtered": {
596 597 "filter": {
597 598 "and": [{"term": {"resource_id": resource_id}}]
598 599 }
599 600 }
600 601 }
601 602 }
602 603
603 604 query = DBSession.query(Log).filter(Log.resource_id == resource_id)
604 605 if filter_settings['namespace']:
605 606 query = query.filter(Log.namespace == filter_settings['namespace'][0])
606 607 es_query['query']['filtered']['filter']['and'].append(
607 608 {"term": {"namespace": filter_settings['namespace'][0]}}
608 609 )
609 610 query.delete(synchronize_session=False)
610 611 request.tm.commit()
611 612 result = request.es_conn.search(es_query, index='rcae_l_*',
612 613 doc_type='log', es_scroll='1m',
613 614 es_search_type='scan')
614 615 scroll_id = result['_scroll_id']
615 616 while True:
616 617 log.warning('log_cleanup, app:{} ns:{} batch'.format(
617 618 resource_id,
618 619 filter_settings['namespace']
619 620 ))
620 621 es_docs_to_delete = []
621 622 result = request.es_conn.send_request(
622 623 'POST', ['_search', 'scroll'],
623 624 body=scroll_id, query_params={"scroll": '1m'})
624 625 scroll_id = result['_scroll_id']
625 626 if not result['hits']['hits']:
626 627 break
627 628 for doc in result['hits']['hits']:
628 629 es_docs_to_delete.append({"id": doc['_id'],
629 630 "index": doc['_index']})
630 631
631 632 for batch in in_batches(es_docs_to_delete, 10):
632 633 Datastores.es.bulk([Datastores.es.delete_op(doc_type='log',
633 634 **to_del)
634 635 for to_del in batch])
General Comments 0
You need to be logged in to leave comments. Login now