##// END OF EJS Templates
tasks: change retry amount
ergo -
Show More
@@ -1,634 +1,634 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2016 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # AppEnlight Enterprise Edition, including its added features, Support
19 19 # services, and proprietary license terms, please see
20 20 # https://rhodecode.com/licenses/
21 21
22 22 import bisect
23 23 import collections
24 24 import math
25 25 from datetime import datetime, timedelta
26 26
27 27 import sqlalchemy as sa
28 28 import pyelasticsearch
29 29
30 30 from celery.utils.log import get_task_logger
31 31 from zope.sqlalchemy import mark_changed
32 32 from pyramid.threadlocal import get_current_request, get_current_registry
33 33 from appenlight.celery import celery
34 34 from appenlight.models.report_group import ReportGroup
35 35 from appenlight.models import DBSession, Datastores
36 36 from appenlight.models.report import Report
37 37 from appenlight.models.log import Log
38 38 from appenlight.models.request_metric import Metric
39 39 from appenlight.models.event import Event
40 40
41 41 from appenlight.models.services.application import ApplicationService
42 42 from appenlight.models.services.event import EventService
43 43 from appenlight.models.services.log import LogService
44 44 from appenlight.models.services.report import ReportService
45 45 from appenlight.models.services.report_group import ReportGroupService
46 46 from appenlight.models.services.user import UserService
47 47 from appenlight.models.tag import Tag
48 48 from appenlight.lib import print_traceback
49 49 from appenlight.lib.utils import parse_proto, in_batches
50 50 from appenlight.lib.ext_json import json
51 51 from appenlight.lib.redis_keys import REDIS_KEYS
52 52 from appenlight.lib.enums import ReportType
53 53
54 54 log = get_task_logger(__name__)
55 55
56 56 sample_boundries = list(range(100, 1000, 100)) + \
57 57 list(range(1000, 10000, 1000)) + \
58 58 list(range(10000, 100000, 5000))
59 59
60 60
61 61 def pick_sample(total_occurences, report_type=None):
62 62 every = 1.0
63 63 position = bisect.bisect_left(sample_boundries, total_occurences)
64 64 if position > 0:
65 65 if report_type == ReportType.not_found:
66 66 divide = 10.0
67 67 else:
68 68 divide = 100.0
69 69 every = sample_boundries[position - 1] / divide
70 70 return total_occurences % every == 0
71 71
72 72
73 73 @celery.task(queue="default", default_retry_delay=1, max_retries=2)
74 74 def test_exception_task():
75 75 log.error('test celery log', extra={'location': 'celery'})
76 76 log.warning('test celery log', extra={'location': 'celery'})
77 77 raise Exception('Celery exception test')
78 78
79 79
80 80 @celery.task(queue="default", default_retry_delay=1, max_retries=2)
81 81 def test_retry_exception_task():
82 82 try:
83 83 import time
84 84
85 85 time.sleep(1.3)
86 86 log.error('test retry celery log', extra={'location': 'celery'})
87 87 log.warning('test retry celery log', extra={'location': 'celery'})
88 88 raise Exception('Celery exception test')
89 89 except Exception as exc:
90 90 test_retry_exception_task.retry(exc=exc)
91 91
92 92
93 @celery.task(queue="reports", default_retry_delay=600, max_retries=999)
93 @celery.task(queue="reports", default_retry_delay=600, max_retries=144)
94 94 def add_reports(resource_id, params, dataset, environ=None, **kwargs):
95 95 proto_version = parse_proto(params.get('protocol_version', ''))
96 96 current_time = datetime.utcnow().replace(second=0, microsecond=0)
97 97 try:
98 98 # we will store solr docs here for single insert
99 99 es_report_docs = {}
100 100 es_report_group_docs = {}
101 101 resource = ApplicationService.by_id(resource_id)
102 102
103 103 tags = []
104 104 es_slow_calls_docs = {}
105 105 es_reports_stats_rows = {}
106 106 for report_data in dataset:
107 107 # build report details for later
108 108 added_details = 0
109 109 report = Report()
110 110 report.set_data(report_data, resource, proto_version)
111 111 report._skip_ft_index = True
112 112
113 113 report_group = ReportGroupService.by_hash_and_resource(
114 114 report.resource_id,
115 115 report.grouping_hash
116 116 )
117 117 occurences = report_data.get('occurences', 1)
118 118 if not report_group:
119 119 # total reports will be +1 moment later
120 120 report_group = ReportGroup(grouping_hash=report.grouping_hash,
121 121 occurences=0, total_reports=0,
122 122 last_report=0,
123 123 priority=report.priority,
124 124 error=report.error,
125 125 first_timestamp=report.start_time)
126 126 report_group._skip_ft_index = True
127 127 report_group.report_type = report.report_type
128 128 report.report_group_time = report_group.first_timestamp
129 129 add_sample = pick_sample(report_group.occurences,
130 130 report_type=report_group.report_type)
131 131 if add_sample:
132 132 resource.report_groups.append(report_group)
133 133 report_group.reports.append(report)
134 134 added_details += 1
135 135 DBSession.flush()
136 136 if report.partition_id not in es_report_docs:
137 137 es_report_docs[report.partition_id] = []
138 138 es_report_docs[report.partition_id].append(report.es_doc())
139 139 tags.extend(list(report.tags.items()))
140 140 slow_calls = report.add_slow_calls(report_data, report_group)
141 141 DBSession.flush()
142 142 for s_call in slow_calls:
143 143 if s_call.partition_id not in es_slow_calls_docs:
144 144 es_slow_calls_docs[s_call.partition_id] = []
145 145 es_slow_calls_docs[s_call.partition_id].append(
146 146 s_call.es_doc())
147 147 # try generating new stat rows if needed
148 148 else:
149 149 # required for postprocessing to not fail later
150 150 report.report_group = report_group
151 151
152 152 stat_row = ReportService.generate_stat_rows(
153 153 report, resource, report_group)
154 154 if stat_row.partition_id not in es_reports_stats_rows:
155 155 es_reports_stats_rows[stat_row.partition_id] = []
156 156 es_reports_stats_rows[stat_row.partition_id].append(
157 157 stat_row.es_doc())
158 158
159 159 # see if we should mark 10th occurence of report
160 160 last_occurences_10 = int(math.floor(report_group.occurences / 10))
161 161 curr_occurences_10 = int(math.floor(
162 162 (report_group.occurences + report.occurences) / 10))
163 163 last_occurences_100 = int(
164 164 math.floor(report_group.occurences / 100))
165 165 curr_occurences_100 = int(math.floor(
166 166 (report_group.occurences + report.occurences) / 100))
167 167 notify_occurences_10 = last_occurences_10 != curr_occurences_10
168 168 notify_occurences_100 = last_occurences_100 != curr_occurences_100
169 169 report_group.occurences = ReportGroup.occurences + occurences
170 170 report_group.last_timestamp = report.start_time
171 171 report_group.summed_duration = ReportGroup.summed_duration + report.duration
172 172 summed_duration = ReportGroup.summed_duration + report.duration
173 173 summed_occurences = ReportGroup.occurences + occurences
174 174 report_group.average_duration = summed_duration / summed_occurences
175 175 report_group.run_postprocessing(report)
176 176 if added_details:
177 177 report_group.total_reports = ReportGroup.total_reports + 1
178 178 report_group.last_report = report.id
179 179 report_group.set_notification_info(notify_10=notify_occurences_10,
180 180 notify_100=notify_occurences_100)
181 181 DBSession.flush()
182 182 report_group.get_report().notify_channel(report_group)
183 183 if report_group.partition_id not in es_report_group_docs:
184 184 es_report_group_docs[report_group.partition_id] = []
185 185 es_report_group_docs[report_group.partition_id].append(
186 186 report_group.es_doc())
187 187
188 188 action = 'REPORT'
189 189 log_msg = '%s: %s %s, client: %s, proto: %s' % (
190 190 action,
191 191 report_data.get('http_status', 'unknown'),
192 192 str(resource),
193 193 report_data.get('client'),
194 194 proto_version)
195 195 log.info(log_msg)
196 196 total_reports = len(dataset)
197 197 key = REDIS_KEYS['counters']['reports_per_minute'].format(current_time)
198 198 Datastores.redis.incr(key, total_reports)
199 199 Datastores.redis.expire(key, 3600 * 24)
200 200 key = REDIS_KEYS['counters']['reports_per_minute_per_app'].format(
201 201 resource_id, current_time)
202 202 Datastores.redis.incr(key, total_reports)
203 203 Datastores.redis.expire(key, 3600 * 24)
204 204
205 205 add_reports_es(es_report_group_docs, es_report_docs)
206 206 add_reports_slow_calls_es(es_slow_calls_docs)
207 207 add_reports_stats_rows_es(es_reports_stats_rows)
208 208 return True
209 209 except Exception as exc:
210 210 print_traceback(log)
211 211 add_reports.retry(exc=exc)
212 212
213 213
214 @celery.task(queue="es", default_retry_delay=600, max_retries=999)
214 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
215 215 def add_reports_es(report_group_docs, report_docs):
216 216 for k, v in report_group_docs.items():
217 217 Datastores.es.bulk_index(k, 'report_group', v, id_field="_id")
218 218 for k, v in report_docs.items():
219 219 Datastores.es.bulk_index(k, 'report', v, id_field="_id",
220 220 parent_field='_parent')
221 221
222 222
223 @celery.task(queue="es", default_retry_delay=600, max_retries=999)
223 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
224 224 def add_reports_slow_calls_es(es_docs):
225 225 for k, v in es_docs.items():
226 226 Datastores.es.bulk_index(k, 'log', v)
227 227
228 228
229 @celery.task(queue="es", default_retry_delay=600, max_retries=999)
229 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
230 230 def add_reports_stats_rows_es(es_docs):
231 231 for k, v in es_docs.items():
232 232 Datastores.es.bulk_index(k, 'log', v)
233 233
234 234
235 @celery.task(queue="logs", default_retry_delay=600, max_retries=999)
235 @celery.task(queue="logs", default_retry_delay=600, max_retries=144)
236 236 def add_logs(resource_id, request, dataset, environ=None, **kwargs):
237 237 proto_version = request.get('protocol_version')
238 238 current_time = datetime.utcnow().replace(second=0, microsecond=0)
239 239
240 240 try:
241 241 es_docs = collections.defaultdict(list)
242 242 application = ApplicationService.by_id(resource_id)
243 243 ns_pairs = []
244 244 for entry in dataset:
245 245 # gather pk and ns so we can remove older versions of row later
246 246 if entry['primary_key'] is not None:
247 247 ns_pairs.append({"pk": entry['primary_key'],
248 248 "ns": entry['namespace']})
249 249 log_entry = Log()
250 250 log_entry.set_data(entry, resource=application)
251 251 log_entry._skip_ft_index = True
252 252 application.logs.append(log_entry)
253 253 DBSession.flush()
254 254 # insert non pk rows first
255 255 if entry['primary_key'] is None:
256 256 es_docs[log_entry.partition_id].append(log_entry.es_doc())
257 257
258 258 # 2nd pass to delete all log entries from db foe same pk/ns pair
259 259 if ns_pairs:
260 260 ids_to_delete = []
261 261 es_docs = collections.defaultdict(list)
262 262 es_docs_to_delete = collections.defaultdict(list)
263 263 found_pkey_logs = LogService.query_by_primary_key_and_namespace(
264 264 list_of_pairs=ns_pairs)
265 265 log_dict = {}
266 266 for log_entry in found_pkey_logs:
267 267 log_key = (log_entry.primary_key, log_entry.namespace)
268 268 if log_key not in log_dict:
269 269 log_dict[log_key] = []
270 270 log_dict[log_key].append(log_entry)
271 271
272 272 for ns, entry_list in log_dict.items():
273 273 entry_list = sorted(entry_list, key=lambda x: x.timestamp)
274 274 # newest row needs to be indexed in es
275 275 log_entry = entry_list[-1]
276 276 # delete everything from pg and ES, leave the last row in pg
277 277 for e in entry_list[:-1]:
278 278 ids_to_delete.append(e.log_id)
279 279 es_docs_to_delete[e.partition_id].append(e.delete_hash)
280 280
281 281 es_docs_to_delete[log_entry.partition_id].append(
282 282 log_entry.delete_hash)
283 283
284 284 es_docs[log_entry.partition_id].append(log_entry.es_doc())
285 285
286 286 if ids_to_delete:
287 287 query = DBSession.query(Log).filter(
288 288 Log.log_id.in_(ids_to_delete))
289 289 query.delete(synchronize_session=False)
290 290 if es_docs_to_delete:
291 291 # batch this to avoid problems with default ES bulk limits
292 292 for es_index in es_docs_to_delete.keys():
293 293 for batch in in_batches(es_docs_to_delete[es_index], 20):
294 294 query = {'terms': {'delete_hash': batch}}
295 295
296 296 try:
297 297 Datastores.es.delete_by_query(
298 298 es_index, 'log', query)
299 299 except pyelasticsearch.ElasticHttpNotFoundError as exc:
300 300 log.error(exc)
301 301
302 302 total_logs = len(dataset)
303 303
304 304 log_msg = 'LOG_NEW: %s, entries: %s, proto:%s' % (
305 305 str(application),
306 306 total_logs,
307 307 proto_version)
308 308 log.info(log_msg)
309 309 # mark_changed(session)
310 310 key = REDIS_KEYS['counters']['logs_per_minute'].format(current_time)
311 311 Datastores.redis.incr(key, total_logs)
312 312 Datastores.redis.expire(key, 3600 * 24)
313 313 key = REDIS_KEYS['counters']['logs_per_minute_per_app'].format(
314 314 resource_id, current_time)
315 315 Datastores.redis.incr(key, total_logs)
316 316 Datastores.redis.expire(key, 3600 * 24)
317 317 add_logs_es(es_docs)
318 318 return True
319 319 except Exception as exc:
320 320 print_traceback(log)
321 321 add_logs.retry(exc=exc)
322 322
323 323
324 @celery.task(queue="es", default_retry_delay=600, max_retries=999)
324 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
325 325 def add_logs_es(es_docs):
326 326 for k, v in es_docs.items():
327 327 Datastores.es.bulk_index(k, 'log', v)
328 328
329 329
330 @celery.task(queue="metrics", default_retry_delay=600, max_retries=999)
330 @celery.task(queue="metrics", default_retry_delay=600, max_retries=144)
331 331 def add_metrics(resource_id, request, dataset, proto_version):
332 332 current_time = datetime.utcnow().replace(second=0, microsecond=0)
333 333 try:
334 334 application = ApplicationService.by_id_cached()(resource_id)
335 335 application = DBSession.merge(application, load=False)
336 336 es_docs = []
337 337 rows = []
338 338 for metric in dataset:
339 339 tags = dict(metric['tags'])
340 340 server_n = tags.get('server_name', metric['server_name']).lower()
341 341 tags['server_name'] = server_n or 'unknown'
342 342 new_metric = Metric(
343 343 timestamp=metric['timestamp'],
344 344 resource_id=application.resource_id,
345 345 namespace=metric['namespace'],
346 346 tags=tags)
347 347 rows.append(new_metric)
348 348 es_docs.append(new_metric.es_doc())
349 349 session = DBSession()
350 350 session.bulk_save_objects(rows)
351 351 session.flush()
352 352
353 353 action = 'METRICS'
354 354 metrics_msg = '%s: %s, metrics: %s, proto:%s' % (
355 355 action,
356 356 str(application),
357 357 len(dataset),
358 358 proto_version
359 359 )
360 360 log.info(metrics_msg)
361 361
362 362 mark_changed(session)
363 363 key = REDIS_KEYS['counters']['metrics_per_minute'].format(current_time)
364 364 Datastores.redis.incr(key, len(rows))
365 365 Datastores.redis.expire(key, 3600 * 24)
366 366 key = REDIS_KEYS['counters']['metrics_per_minute_per_app'].format(
367 367 resource_id, current_time)
368 368 Datastores.redis.incr(key, len(rows))
369 369 Datastores.redis.expire(key, 3600 * 24)
370 370 add_metrics_es(es_docs)
371 371 return True
372 372 except Exception as exc:
373 373 print_traceback(log)
374 374 add_metrics.retry(exc=exc)
375 375
376 376
377 @celery.task(queue="es", default_retry_delay=600, max_retries=999)
377 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
378 378 def add_metrics_es(es_docs):
379 379 for doc in es_docs:
380 380 partition = 'rcae_m_%s' % doc['timestamp'].strftime('%Y_%m_%d')
381 381 Datastores.es.index(partition, 'log', doc)
382 382
383 383
384 384 @celery.task(queue="default", default_retry_delay=5, max_retries=2)
385 385 def check_user_report_notifications(resource_id):
386 386 since_when = datetime.utcnow()
387 387 try:
388 388 request = get_current_request()
389 389 application = ApplicationService.by_id(resource_id)
390 390 if not application:
391 391 return
392 392 error_key = REDIS_KEYS['reports_to_notify_per_type_per_app'].format(
393 393 ReportType.error, resource_id)
394 394 slow_key = REDIS_KEYS['reports_to_notify_per_type_per_app'].format(
395 395 ReportType.slow, resource_id)
396 396 error_group_ids = Datastores.redis.smembers(error_key)
397 397 slow_group_ids = Datastores.redis.smembers(slow_key)
398 398 Datastores.redis.delete(error_key)
399 399 Datastores.redis.delete(slow_key)
400 400 err_gids = [int(g_id) for g_id in error_group_ids]
401 401 slow_gids = [int(g_id) for g_id in list(slow_group_ids)]
402 402 group_ids = err_gids + slow_gids
403 403 occurence_dict = {}
404 404 for g_id in group_ids:
405 405 key = REDIS_KEYS['counters']['report_group_occurences'].format(
406 406 g_id)
407 407 val = Datastores.redis.get(key)
408 408 Datastores.redis.delete(key)
409 409 if val:
410 410 occurence_dict[g_id] = int(val)
411 411 else:
412 412 occurence_dict[g_id] = 1
413 413 report_groups = ReportGroupService.by_ids(group_ids)
414 414 report_groups.options(sa.orm.joinedload(ReportGroup.last_report_ref))
415 415
416 416 ApplicationService.check_for_groups_alert(
417 417 application, 'alert', report_groups=report_groups,
418 418 occurence_dict=occurence_dict)
419 419 users = set([p.user for p in application.users_for_perm('view')])
420 420 report_groups = report_groups.all()
421 421 for user in users:
422 422 UserService.report_notify(user, request, application,
423 423 report_groups=report_groups,
424 424 occurence_dict=occurence_dict)
425 425 for group in report_groups:
426 426 # marks report_groups as notified
427 427 if not group.notified:
428 428 group.notified = True
429 429 except Exception as exc:
430 430 print_traceback(log)
431 431 raise
432 432
433 433
434 434 @celery.task(queue="default", default_retry_delay=5, max_retries=2)
435 435 def check_alerts(resource_id):
436 436 since_when = datetime.utcnow()
437 437 try:
438 438 request = get_current_request()
439 439 application = ApplicationService.by_id(resource_id)
440 440 if not application:
441 441 return
442 442 error_key = REDIS_KEYS[
443 443 'reports_to_notify_per_type_per_app_alerting'].format(
444 444 ReportType.error, resource_id)
445 445 slow_key = REDIS_KEYS[
446 446 'reports_to_notify_per_type_per_app_alerting'].format(
447 447 ReportType.slow, resource_id)
448 448 error_group_ids = Datastores.redis.smembers(error_key)
449 449 slow_group_ids = Datastores.redis.smembers(slow_key)
450 450 Datastores.redis.delete(error_key)
451 451 Datastores.redis.delete(slow_key)
452 452 err_gids = [int(g_id) for g_id in error_group_ids]
453 453 slow_gids = [int(g_id) for g_id in list(slow_group_ids)]
454 454 group_ids = err_gids + slow_gids
455 455 occurence_dict = {}
456 456 for g_id in group_ids:
457 457 key = REDIS_KEYS['counters'][
458 458 'report_group_occurences_alerting'].format(
459 459 g_id)
460 460 val = Datastores.redis.get(key)
461 461 Datastores.redis.delete(key)
462 462 if val:
463 463 occurence_dict[g_id] = int(val)
464 464 else:
465 465 occurence_dict[g_id] = 1
466 466 report_groups = ReportGroupService.by_ids(group_ids)
467 467 report_groups.options(sa.orm.joinedload(ReportGroup.last_report_ref))
468 468
469 469 ApplicationService.check_for_groups_alert(
470 470 application, 'alert', report_groups=report_groups,
471 471 occurence_dict=occurence_dict, since_when=since_when)
472 472 except Exception as exc:
473 473 print_traceback(log)
474 474 raise
475 475
476 476
477 477 @celery.task(queue="default", default_retry_delay=1, max_retries=2)
478 478 def close_alerts():
479 479 log.warning('Checking alerts')
480 480 since_when = datetime.utcnow()
481 481 try:
482 482 event_types = [Event.types['error_report_alert'],
483 483 Event.types['slow_report_alert'], ]
484 484 statuses = [Event.statuses['active']]
485 485 # get events older than 5 min
486 486 events = EventService.by_type_and_status(
487 487 event_types,
488 488 statuses,
489 489 older_than=(since_when - timedelta(minutes=5)))
490 490 for event in events:
491 491 # see if we can close them
492 492 event.validate_or_close(
493 493 since_when=(since_when - timedelta(minutes=1)))
494 494 except Exception as exc:
495 495 print_traceback(log)
496 496 raise
497 497
498 498
499 @celery.task(queue="default", default_retry_delay=600, max_retries=999)
499 @celery.task(queue="default", default_retry_delay=600, max_retries=144)
500 500 def update_tag_counter(tag_name, tag_value, count):
501 501 try:
502 502 query = DBSession.query(Tag).filter(Tag.name == tag_name).filter(
503 503 sa.cast(Tag.value, sa.types.TEXT) == sa.cast(json.dumps(tag_value),
504 504 sa.types.TEXT))
505 505 query.update({'times_seen': Tag.times_seen + count,
506 506 'last_timestamp': datetime.utcnow()},
507 507 synchronize_session=False)
508 508 session = DBSession()
509 509 mark_changed(session)
510 510 return True
511 511 except Exception as exc:
512 512 print_traceback(log)
513 513 update_tag_counter.retry(exc=exc)
514 514
515 515
516 516 @celery.task(queue="default")
517 517 def update_tag_counters():
518 518 """
519 519 Sets task to update counters for application tags
520 520 """
521 521 tags = Datastores.redis.lrange(REDIS_KEYS['seen_tag_list'], 0, -1)
522 522 Datastores.redis.delete(REDIS_KEYS['seen_tag_list'])
523 523 c = collections.Counter(tags)
524 524 for t_json, count in c.items():
525 525 tag_info = json.loads(t_json)
526 526 update_tag_counter.delay(tag_info[0], tag_info[1], count)
527 527
528 528
529 529 @celery.task(queue="default")
530 530 def daily_digest():
531 531 """
532 532 Sends daily digest with top 50 error reports
533 533 """
534 534 request = get_current_request()
535 535 apps = Datastores.redis.smembers(REDIS_KEYS['apps_that_had_reports'])
536 536 Datastores.redis.delete(REDIS_KEYS['apps_that_had_reports'])
537 537 since_when = datetime.utcnow() - timedelta(hours=8)
538 538 log.warning('Generating daily digests')
539 539 for resource_id in apps:
540 540 resource_id = resource_id.decode('utf8')
541 541 end_date = datetime.utcnow().replace(microsecond=0, second=0)
542 542 filter_settings = {'resource': [resource_id],
543 543 'tags': [{'name': 'type',
544 544 'value': ['error'], 'op': None}],
545 545 'type': 'error', 'start_date': since_when,
546 546 'end_date': end_date}
547 547
548 548 reports = ReportGroupService.get_trending(
549 549 request, filter_settings=filter_settings, limit=50)
550 550
551 551 application = ApplicationService.by_id(resource_id)
552 552 if application:
553 553 users = set([p.user for p in application.users_for_perm('view')])
554 554 for user in users:
555 555 user.send_digest(request, application, reports=reports,
556 556 since_when=since_when)
557 557
558 558
559 559 @celery.task(queue="default")
560 560 def notifications_reports():
561 561 """
562 562 Loop that checks redis for info and then issues new tasks to celery to
563 563 issue notifications
564 564 """
565 565 apps = Datastores.redis.smembers(REDIS_KEYS['apps_that_had_reports'])
566 566 Datastores.redis.delete(REDIS_KEYS['apps_that_had_reports'])
567 567 for app in apps:
568 568 log.warning('Notify for app: %s' % app)
569 569 check_user_report_notifications.delay(app.decode('utf8'))
570 570
571 571 @celery.task(queue="default")
572 572 def alerting_reports():
573 573 """
574 574 Loop that checks redis for info and then issues new tasks to celery to
575 575 perform the following:
576 576 - which applications should have new alerts opened
577 577 """
578 578
579 579 apps = Datastores.redis.smembers(REDIS_KEYS['apps_that_had_reports_alerting'])
580 580 Datastores.redis.delete(REDIS_KEYS['apps_that_had_reports_alerting'])
581 581 for app in apps:
582 582 log.warning('Notify for app: %s' % app)
583 583 check_alerts.delay(app.decode('utf8'))
584 584
585 585
586 586 @celery.task(queue="default", soft_time_limit=3600 * 4,
587 hard_time_limit=3600 * 4, max_retries=999)
587 hard_time_limit=3600 * 4, max_retries=144)
588 588 def logs_cleanup(resource_id, filter_settings):
589 589 request = get_current_request()
590 590 request.tm.begin()
591 591 es_query = {
592 592 "_source": False,
593 593 "size": 5000,
594 594 "query": {
595 595 "filtered": {
596 596 "filter": {
597 597 "and": [{"term": {"resource_id": resource_id}}]
598 598 }
599 599 }
600 600 }
601 601 }
602 602
603 603 query = DBSession.query(Log).filter(Log.resource_id == resource_id)
604 604 if filter_settings['namespace']:
605 605 query = query.filter(Log.namespace == filter_settings['namespace'][0])
606 606 es_query['query']['filtered']['filter']['and'].append(
607 607 {"term": {"namespace": filter_settings['namespace'][0]}}
608 608 )
609 609 query.delete(synchronize_session=False)
610 610 request.tm.commit()
611 611 result = request.es_conn.search(es_query, index='rcae_l_*',
612 612 doc_type='log', es_scroll='1m',
613 613 es_search_type='scan')
614 614 scroll_id = result['_scroll_id']
615 615 while True:
616 616 log.warning('log_cleanup, app:{} ns:{} batch'.format(
617 617 resource_id,
618 618 filter_settings['namespace']
619 619 ))
620 620 es_docs_to_delete = []
621 621 result = request.es_conn.send_request(
622 622 'POST', ['_search', 'scroll'],
623 623 body=scroll_id, query_params={"scroll": '1m'})
624 624 scroll_id = result['_scroll_id']
625 625 if not result['hits']['hits']:
626 626 break
627 627 for doc in result['hits']['hits']:
628 628 es_docs_to_delete.append({"id": doc['_id'],
629 629 "index": doc['_index']})
630 630
631 631 for batch in in_batches(es_docs_to_delete, 10):
632 632 Datastores.es.bulk([Datastores.es.delete_op(doc_type='log',
633 633 **to_del)
634 634 for to_del in batch])
General Comments 0
You need to be logged in to leave comments. Login now