##// END OF EJS Templates
tasks: when CELERY_EAGER_PROPAGATES_EXCEPTIONS is true do not retry tasks
ergo -
Show More
@@ -1,662 +1,672 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors
4 4 #
5 5 # Licensed under the Apache License, Version 2.0 (the "License");
6 6 # you may not use this file except in compliance with the License.
7 7 # You may obtain a copy of the License at
8 8 #
9 9 # http://www.apache.org/licenses/LICENSE-2.0
10 10 #
11 11 # Unless required by applicable law or agreed to in writing, software
12 12 # distributed under the License is distributed on an "AS IS" BASIS,
13 13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 14 # See the License for the specific language governing permissions and
15 15 # limitations under the License.
16 16
17 17 import bisect
18 18 import collections
19 19 import math
20 20 from datetime import datetime, timedelta
21 21
22 22 import sqlalchemy as sa
23 23 import pyelasticsearch
24 24
25 25 from celery.utils.log import get_task_logger
26 26 from zope.sqlalchemy import mark_changed
27 27 from pyramid.threadlocal import get_current_request, get_current_registry
28 28 from ziggurat_foundations.models.services.resource import ResourceService
29 29
30 30 from appenlight.celery import celery
31 31 from appenlight.models.report_group import ReportGroup
32 32 from appenlight.models import DBSession, Datastores
33 33 from appenlight.models.report import Report
34 34 from appenlight.models.log import Log
35 35 from appenlight.models.metric import Metric
36 36 from appenlight.models.event import Event
37 37
38 38 from appenlight.models.services.application import ApplicationService
39 39 from appenlight.models.services.event import EventService
40 40 from appenlight.models.services.log import LogService
41 41 from appenlight.models.services.report import ReportService
42 42 from appenlight.models.services.report_group import ReportGroupService
43 43 from appenlight.models.services.user import UserService
44 44 from appenlight.models.tag import Tag
45 45 from appenlight.lib import print_traceback
46 46 from appenlight.lib.utils import parse_proto, in_batches
47 47 from appenlight.lib.ext_json import json
48 48 from appenlight.lib.redis_keys import REDIS_KEYS
49 49 from appenlight.lib.enums import ReportType
50 50
51 51 log = get_task_logger(__name__)
52 52
53 53 sample_boundries = list(range(100, 1000, 100)) + \
54 54 list(range(1000, 10000, 1000)) + \
55 55 list(range(10000, 100000, 5000))
56 56
57 57
58 58 def pick_sample(total_occurences, report_type=None):
59 59 every = 1.0
60 60 position = bisect.bisect_left(sample_boundries, total_occurences)
61 61 if position > 0:
62 62 if report_type == ReportType.not_found:
63 63 divide = 10.0
64 64 else:
65 65 divide = 100.0
66 66 every = sample_boundries[position - 1] / divide
67 67 return total_occurences % every == 0
68 68
69 69
70 70 @celery.task(queue="default", default_retry_delay=1, max_retries=2)
71 71 def test_exception_task():
72 72 log.error('test celery log', extra={'location': 'celery'})
73 73 log.warning('test celery log', extra={'location': 'celery'})
74 74 raise Exception('Celery exception test')
75 75
76 76
77 77 @celery.task(queue="default", default_retry_delay=1, max_retries=2)
78 78 def test_retry_exception_task():
79 79 try:
80 80 import time
81 81
82 82 time.sleep(1.3)
83 83 log.error('test retry celery log', extra={'location': 'celery'})
84 84 log.warning('test retry celery log', extra={'location': 'celery'})
85 85 raise Exception('Celery exception test')
86 86 except Exception as exc:
87 if celery.conf["CELERY_EAGER_PROPAGATES_EXCEPTIONS"]:
88 raise
87 89 test_retry_exception_task.retry(exc=exc)
88 90
89 91
90 92 @celery.task(queue="reports", default_retry_delay=600, max_retries=144)
91 93 def add_reports(resource_id, request_params, dataset, **kwargs):
92 94 proto_version = parse_proto(request_params.get('protocol_version', ''))
93 95 current_time = datetime.utcnow().replace(second=0, microsecond=0)
94 96 try:
95 97 # we will store solr docs here for single insert
96 98 es_report_docs = {}
97 99 es_report_group_docs = {}
98 100 resource = ApplicationService.by_id(resource_id)
99 101
100 102 tags = []
101 103 es_slow_calls_docs = {}
102 104 es_reports_stats_rows = {}
103 105 for report_data in dataset:
104 106 # build report details for later
105 107 added_details = 0
106 108 report = Report()
107 109 report.set_data(report_data, resource, proto_version)
108 110 report._skip_ft_index = True
109 111
110 112 # find latest group in this months partition
111 113 report_group = ReportGroupService.by_hash_and_resource(
112 114 report.resource_id,
113 115 report.grouping_hash,
114 116 since_when=datetime.utcnow().date().replace(day=1)
115 117 )
116 118 occurences = report_data.get('occurences', 1)
117 119 if not report_group:
118 120 # total reports will be +1 moment later
119 121 report_group = ReportGroup(grouping_hash=report.grouping_hash,
120 122 occurences=0, total_reports=0,
121 123 last_report=0,
122 124 priority=report.priority,
123 125 error=report.error,
124 126 first_timestamp=report.start_time)
125 127 report_group._skip_ft_index = True
126 128 report_group.report_type = report.report_type
127 129 report.report_group_time = report_group.first_timestamp
128 130 add_sample = pick_sample(report_group.occurences,
129 131 report_type=report_group.report_type)
130 132 if add_sample:
131 133 resource.report_groups.append(report_group)
132 134 report_group.reports.append(report)
133 135 added_details += 1
134 136 DBSession.flush()
135 137 if report.partition_id not in es_report_docs:
136 138 es_report_docs[report.partition_id] = []
137 139 es_report_docs[report.partition_id].append(report.es_doc())
138 140 tags.extend(list(report.tags.items()))
139 141 slow_calls = report.add_slow_calls(report_data, report_group)
140 142 DBSession.flush()
141 143 for s_call in slow_calls:
142 144 if s_call.partition_id not in es_slow_calls_docs:
143 145 es_slow_calls_docs[s_call.partition_id] = []
144 146 es_slow_calls_docs[s_call.partition_id].append(
145 147 s_call.es_doc())
146 148 # try generating new stat rows if needed
147 149 else:
148 150 # required for postprocessing to not fail later
149 151 report.report_group = report_group
150 152
151 153 stat_row = ReportService.generate_stat_rows(
152 154 report, resource, report_group)
153 155 if stat_row.partition_id not in es_reports_stats_rows:
154 156 es_reports_stats_rows[stat_row.partition_id] = []
155 157 es_reports_stats_rows[stat_row.partition_id].append(
156 158 stat_row.es_doc())
157 159
158 160 # see if we should mark 10th occurence of report
159 161 last_occurences_10 = int(math.floor(report_group.occurences / 10))
160 162 curr_occurences_10 = int(math.floor(
161 163 (report_group.occurences + report.occurences) / 10))
162 164 last_occurences_100 = int(
163 165 math.floor(report_group.occurences / 100))
164 166 curr_occurences_100 = int(math.floor(
165 167 (report_group.occurences + report.occurences) / 100))
166 168 notify_occurences_10 = last_occurences_10 != curr_occurences_10
167 169 notify_occurences_100 = last_occurences_100 != curr_occurences_100
168 170 report_group.occurences = ReportGroup.occurences + occurences
169 171 report_group.last_timestamp = report.start_time
170 172 report_group.summed_duration = ReportGroup.summed_duration + report.duration
171 173 summed_duration = ReportGroup.summed_duration + report.duration
172 174 summed_occurences = ReportGroup.occurences + occurences
173 175 report_group.average_duration = summed_duration / summed_occurences
174 176 report_group.run_postprocessing(report)
175 177 if added_details:
176 178 report_group.total_reports = ReportGroup.total_reports + 1
177 179 report_group.last_report = report.id
178 180 report_group.set_notification_info(notify_10=notify_occurences_10,
179 181 notify_100=notify_occurences_100)
180 182 DBSession.flush()
181 183 report_group.get_report().notify_channel(report_group)
182 184 if report_group.partition_id not in es_report_group_docs:
183 185 es_report_group_docs[report_group.partition_id] = []
184 186 es_report_group_docs[report_group.partition_id].append(
185 187 report_group.es_doc())
186 188
187 189 action = 'REPORT'
188 190 log_msg = '%s: %s %s, client: %s, proto: %s' % (
189 191 action,
190 192 report_data.get('http_status', 'unknown'),
191 193 str(resource),
192 194 report_data.get('client'),
193 195 proto_version)
194 196 log.info(log_msg)
195 197 total_reports = len(dataset)
196 198 redis_pipeline = Datastores.redis.pipeline(transaction=False)
197 199 key = REDIS_KEYS['counters']['reports_per_minute'].format(current_time)
198 200 redis_pipeline.incr(key, total_reports)
199 201 redis_pipeline.expire(key, 3600 * 24)
200 202 key = REDIS_KEYS['counters']['events_per_minute_per_user'].format(
201 203 resource.owner_user_id, current_time)
202 204 redis_pipeline.incr(key, total_reports)
203 205 redis_pipeline.expire(key, 3600)
204 206 key = REDIS_KEYS['counters']['reports_per_hour_per_app'].format(
205 207 resource_id, current_time.replace(minute=0))
206 208 redis_pipeline.incr(key, total_reports)
207 209 redis_pipeline.expire(key, 3600 * 24 * 7)
208 210 redis_pipeline.sadd(
209 211 REDIS_KEYS['apps_that_got_new_data_per_hour'].format(
210 212 current_time.replace(minute=0)), resource_id)
211 213 redis_pipeline.execute()
212 214
213 215 add_reports_es(es_report_group_docs, es_report_docs)
214 216 add_reports_slow_calls_es(es_slow_calls_docs)
215 217 add_reports_stats_rows_es(es_reports_stats_rows)
216 218 return True
217 219 except Exception as exc:
218 220 print_traceback(log)
221 if celery.conf["CELERY_EAGER_PROPAGATES_EXCEPTIONS"]:
222 raise
219 223 add_reports.retry(exc=exc)
220 224
221 225
222 226 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
223 227 def add_reports_es(report_group_docs, report_docs):
224 228 for k, v in report_group_docs.items():
225 229 Datastores.es.bulk_index(k, 'report_group', v, id_field="_id")
226 230 for k, v in report_docs.items():
227 231 Datastores.es.bulk_index(k, 'report', v, id_field="_id",
228 232 parent_field='_parent')
229 233
230 234
231 235 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
232 236 def add_reports_slow_calls_es(es_docs):
233 237 for k, v in es_docs.items():
234 238 Datastores.es.bulk_index(k, 'log', v)
235 239
236 240
237 241 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
238 242 def add_reports_stats_rows_es(es_docs):
239 243 for k, v in es_docs.items():
240 244 Datastores.es.bulk_index(k, 'log', v)
241 245
242 246
243 247 @celery.task(queue="logs", default_retry_delay=600, max_retries=144)
244 248 def add_logs(resource_id, request_params, dataset, **kwargs):
245 249 proto_version = request_params.get('protocol_version')
246 250 current_time = datetime.utcnow().replace(second=0, microsecond=0)
247 251
248 252 try:
249 253 es_docs = collections.defaultdict(list)
250 254 resource = ApplicationService.by_id_cached()(resource_id)
251 255 resource = DBSession.merge(resource, load=False)
252 256 ns_pairs = []
253 257 for entry in dataset:
254 258 # gather pk and ns so we can remove older versions of row later
255 259 if entry['primary_key'] is not None:
256 260 ns_pairs.append({"pk": entry['primary_key'],
257 261 "ns": entry['namespace']})
258 262 log_entry = Log()
259 263 log_entry.set_data(entry, resource=resource)
260 264 log_entry._skip_ft_index = True
261 265 resource.logs.append(log_entry)
262 266 DBSession.flush()
263 267 # insert non pk rows first
264 268 if entry['primary_key'] is None:
265 269 es_docs[log_entry.partition_id].append(log_entry.es_doc())
266 270
267 271 # 2nd pass to delete all log entries from db foe same pk/ns pair
268 272 if ns_pairs:
269 273 ids_to_delete = []
270 274 es_docs = collections.defaultdict(list)
271 275 es_docs_to_delete = collections.defaultdict(list)
272 276 found_pkey_logs = LogService.query_by_primary_key_and_namespace(
273 277 list_of_pairs=ns_pairs)
274 278 log_dict = {}
275 279 for log_entry in found_pkey_logs:
276 280 log_key = (log_entry.primary_key, log_entry.namespace)
277 281 if log_key not in log_dict:
278 282 log_dict[log_key] = []
279 283 log_dict[log_key].append(log_entry)
280 284
281 285 for ns, entry_list in log_dict.items():
282 286 entry_list = sorted(entry_list, key=lambda x: x.timestamp)
283 287 # newest row needs to be indexed in es
284 288 log_entry = entry_list[-1]
285 289 # delete everything from pg and ES, leave the last row in pg
286 290 for e in entry_list[:-1]:
287 291 ids_to_delete.append(e.log_id)
288 292 es_docs_to_delete[e.partition_id].append(e.delete_hash)
289 293
290 294 es_docs_to_delete[log_entry.partition_id].append(
291 295 log_entry.delete_hash)
292 296
293 297 es_docs[log_entry.partition_id].append(log_entry.es_doc())
294 298
295 299 if ids_to_delete:
296 300 query = DBSession.query(Log).filter(
297 301 Log.log_id.in_(ids_to_delete))
298 302 query.delete(synchronize_session=False)
299 303 if es_docs_to_delete:
300 304 # batch this to avoid problems with default ES bulk limits
301 305 for es_index in es_docs_to_delete.keys():
302 306 for batch in in_batches(es_docs_to_delete[es_index], 20):
303 307 query = {'terms': {'delete_hash': batch}}
304 308
305 309 try:
306 310 Datastores.es.delete_by_query(
307 311 es_index, 'log', query)
308 312 except pyelasticsearch.ElasticHttpNotFoundError as exc:
309 313 msg = 'skipping index {}'.format(es_index)
310 314 log.info(msg)
311 315
312 316 total_logs = len(dataset)
313 317
314 318 log_msg = 'LOG_NEW: %s, entries: %s, proto:%s' % (
315 319 str(resource),
316 320 total_logs,
317 321 proto_version)
318 322 log.info(log_msg)
319 323 # mark_changed(session)
320 324 redis_pipeline = Datastores.redis.pipeline(transaction=False)
321 325 key = REDIS_KEYS['counters']['logs_per_minute'].format(current_time)
322 326 redis_pipeline.incr(key, total_logs)
323 327 redis_pipeline.expire(key, 3600 * 24)
324 328 key = REDIS_KEYS['counters']['events_per_minute_per_user'].format(
325 329 resource.owner_user_id, current_time)
326 330 redis_pipeline.incr(key, total_logs)
327 331 redis_pipeline.expire(key, 3600)
328 332 key = REDIS_KEYS['counters']['logs_per_hour_per_app'].format(
329 333 resource_id, current_time.replace(minute=0))
330 334 redis_pipeline.incr(key, total_logs)
331 335 redis_pipeline.expire(key, 3600 * 24 * 7)
332 336 redis_pipeline.sadd(
333 337 REDIS_KEYS['apps_that_got_new_data_per_hour'].format(
334 338 current_time.replace(minute=0)), resource_id)
335 339 redis_pipeline.execute()
336 340 add_logs_es(es_docs)
337 341 return True
338 342 except Exception as exc:
339 343 print_traceback(log)
344 if celery.conf["CELERY_EAGER_PROPAGATES_EXCEPTIONS"]:
345 raise
340 346 add_logs.retry(exc=exc)
341 347
342 348
343 349 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
344 350 def add_logs_es(es_docs):
345 351 for k, v in es_docs.items():
346 352 Datastores.es.bulk_index(k, 'log', v)
347 353
348 354
349 355 @celery.task(queue="metrics", default_retry_delay=600, max_retries=144)
350 356 def add_metrics(resource_id, request_params, dataset, proto_version):
351 357 current_time = datetime.utcnow().replace(second=0, microsecond=0)
352 358 try:
353 359 resource = ApplicationService.by_id_cached()(resource_id)
354 360 resource = DBSession.merge(resource, load=False)
355 361 es_docs = []
356 362 rows = []
357 363 for metric in dataset:
358 364 tags = dict(metric['tags'])
359 365 server_n = tags.get('server_name', metric['server_name']).lower()
360 366 tags['server_name'] = server_n or 'unknown'
361 367 new_metric = Metric(
362 368 timestamp=metric['timestamp'],
363 369 resource_id=resource.resource_id,
364 370 namespace=metric['namespace'],
365 371 tags=tags)
366 372 rows.append(new_metric)
367 373 es_docs.append(new_metric.es_doc())
368 374 session = DBSession()
369 375 session.bulk_save_objects(rows)
370 376 session.flush()
371 377
372 378 action = 'METRICS'
373 379 metrics_msg = '%s: %s, metrics: %s, proto:%s' % (
374 380 action,
375 381 str(resource),
376 382 len(dataset),
377 383 proto_version
378 384 )
379 385 log.info(metrics_msg)
380 386
381 387 mark_changed(session)
382 388 redis_pipeline = Datastores.redis.pipeline(transaction=False)
383 389 key = REDIS_KEYS['counters']['metrics_per_minute'].format(current_time)
384 390 redis_pipeline.incr(key, len(rows))
385 391 redis_pipeline.expire(key, 3600 * 24)
386 392 key = REDIS_KEYS['counters']['events_per_minute_per_user'].format(
387 393 resource.owner_user_id, current_time)
388 394 redis_pipeline.incr(key, len(rows))
389 395 redis_pipeline.expire(key, 3600)
390 396 key = REDIS_KEYS['counters']['metrics_per_hour_per_app'].format(
391 397 resource_id, current_time.replace(minute=0))
392 398 redis_pipeline.incr(key, len(rows))
393 399 redis_pipeline.expire(key, 3600 * 24 * 7)
394 400 redis_pipeline.sadd(
395 401 REDIS_KEYS['apps_that_got_new_data_per_hour'].format(
396 402 current_time.replace(minute=0)), resource_id)
397 403 redis_pipeline.execute()
398 404 add_metrics_es(es_docs)
399 405 return True
400 406 except Exception as exc:
401 407 print_traceback(log)
408 if celery.conf["CELERY_EAGER_PROPAGATES_EXCEPTIONS"]:
409 raise
402 410 add_metrics.retry(exc=exc)
403 411
404 412
405 413 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
406 414 def add_metrics_es(es_docs):
407 415 for doc in es_docs:
408 416 partition = 'rcae_m_%s' % doc['timestamp'].strftime('%Y_%m_%d')
409 417 Datastores.es.index(partition, 'log', doc)
410 418
411 419
412 420 @celery.task(queue="default", default_retry_delay=5, max_retries=2)
413 421 def check_user_report_notifications(resource_id):
414 422 since_when = datetime.utcnow()
415 423 try:
416 424 request = get_current_request()
417 425 application = ApplicationService.by_id(resource_id)
418 426 if not application:
419 427 return
420 428 error_key = REDIS_KEYS['reports_to_notify_per_type_per_app'].format(
421 429 ReportType.error, resource_id)
422 430 slow_key = REDIS_KEYS['reports_to_notify_per_type_per_app'].format(
423 431 ReportType.slow, resource_id)
424 432 error_group_ids = Datastores.redis.smembers(error_key)
425 433 slow_group_ids = Datastores.redis.smembers(slow_key)
426 434 Datastores.redis.delete(error_key)
427 435 Datastores.redis.delete(slow_key)
428 436 err_gids = [int(g_id) for g_id in error_group_ids]
429 437 slow_gids = [int(g_id) for g_id in list(slow_group_ids)]
430 438 group_ids = err_gids + slow_gids
431 439 occurence_dict = {}
432 440 for g_id in group_ids:
433 441 key = REDIS_KEYS['counters']['report_group_occurences'].format(
434 442 g_id)
435 443 val = Datastores.redis.get(key)
436 444 Datastores.redis.delete(key)
437 445 if val:
438 446 occurence_dict[g_id] = int(val)
439 447 else:
440 448 occurence_dict[g_id] = 1
441 449 report_groups = ReportGroupService.by_ids(group_ids)
442 450 report_groups.options(sa.orm.joinedload(ReportGroup.last_report_ref))
443 451
444 452 ApplicationService.check_for_groups_alert(
445 453 application, 'alert', report_groups=report_groups,
446 454 occurence_dict=occurence_dict)
447 455 users = set([p.user for p in ResourceService.users_for_perm(application, 'view')])
448 456 report_groups = report_groups.all()
449 457 for user in users:
450 458 UserService.report_notify(user, request, application,
451 459 report_groups=report_groups,
452 460 occurence_dict=occurence_dict)
453 461 for group in report_groups:
454 462 # marks report_groups as notified
455 463 if not group.notified:
456 464 group.notified = True
457 465 except Exception as exc:
458 466 print_traceback(log)
459 467 raise
460 468
461 469
462 470 @celery.task(queue="default", default_retry_delay=5, max_retries=2)
463 471 def check_alerts(resource_id):
464 472 since_when = datetime.utcnow()
465 473 try:
466 474 request = get_current_request()
467 475 application = ApplicationService.by_id(resource_id)
468 476 if not application:
469 477 return
470 478 error_key = REDIS_KEYS[
471 479 'reports_to_notify_per_type_per_app_alerting'].format(
472 480 ReportType.error, resource_id)
473 481 slow_key = REDIS_KEYS[
474 482 'reports_to_notify_per_type_per_app_alerting'].format(
475 483 ReportType.slow, resource_id)
476 484 error_group_ids = Datastores.redis.smembers(error_key)
477 485 slow_group_ids = Datastores.redis.smembers(slow_key)
478 486 Datastores.redis.delete(error_key)
479 487 Datastores.redis.delete(slow_key)
480 488 err_gids = [int(g_id) for g_id in error_group_ids]
481 489 slow_gids = [int(g_id) for g_id in list(slow_group_ids)]
482 490 group_ids = err_gids + slow_gids
483 491 occurence_dict = {}
484 492 for g_id in group_ids:
485 493 key = REDIS_KEYS['counters'][
486 494 'report_group_occurences_alerting'].format(
487 495 g_id)
488 496 val = Datastores.redis.get(key)
489 497 Datastores.redis.delete(key)
490 498 if val:
491 499 occurence_dict[g_id] = int(val)
492 500 else:
493 501 occurence_dict[g_id] = 1
494 502 report_groups = ReportGroupService.by_ids(group_ids)
495 503 report_groups.options(sa.orm.joinedload(ReportGroup.last_report_ref))
496 504
497 505 ApplicationService.check_for_groups_alert(
498 506 application, 'alert', report_groups=report_groups,
499 507 occurence_dict=occurence_dict, since_when=since_when)
500 508 except Exception as exc:
501 509 print_traceback(log)
502 510 raise
503 511
504 512
505 513 @celery.task(queue="default", default_retry_delay=1, max_retries=2)
506 514 def close_alerts():
507 515 log.warning('Checking alerts')
508 516 since_when = datetime.utcnow()
509 517 try:
510 518 event_types = [Event.types['error_report_alert'],
511 519 Event.types['slow_report_alert'], ]
512 520 statuses = [Event.statuses['active']]
513 521 # get events older than 5 min
514 522 events = EventService.by_type_and_status(
515 523 event_types,
516 524 statuses,
517 525 older_than=(since_when - timedelta(minutes=5)))
518 526 for event in events:
519 527 # see if we can close them
520 528 event.validate_or_close(
521 529 since_when=(since_when - timedelta(minutes=1)))
522 530 except Exception as exc:
523 531 print_traceback(log)
524 532 raise
525 533
526 534
527 535 @celery.task(queue="default", default_retry_delay=600, max_retries=144)
528 536 def update_tag_counter(tag_name, tag_value, count):
529 537 try:
530 538 query = DBSession.query(Tag).filter(Tag.name == tag_name).filter(
531 539 sa.cast(Tag.value, sa.types.TEXT) == sa.cast(json.dumps(tag_value),
532 540 sa.types.TEXT))
533 541 query.update({'times_seen': Tag.times_seen + count,
534 542 'last_timestamp': datetime.utcnow()},
535 543 synchronize_session=False)
536 544 session = DBSession()
537 545 mark_changed(session)
538 546 return True
539 547 except Exception as exc:
540 548 print_traceback(log)
549 if celery.conf["CELERY_EAGER_PROPAGATES_EXCEPTIONS"]:
550 raise
541 551 update_tag_counter.retry(exc=exc)
542 552
543 553
544 554 @celery.task(queue="default")
545 555 def update_tag_counters():
546 556 """
547 557 Sets task to update counters for application tags
548 558 """
549 559 tags = Datastores.redis.lrange(REDIS_KEYS['seen_tag_list'], 0, -1)
550 560 Datastores.redis.delete(REDIS_KEYS['seen_tag_list'])
551 561 c = collections.Counter(tags)
552 562 for t_json, count in c.items():
553 563 tag_info = json.loads(t_json)
554 564 update_tag_counter.delay(tag_info[0], tag_info[1], count)
555 565
556 566
557 567 @celery.task(queue="default")
558 568 def daily_digest():
559 569 """
560 570 Sends daily digest with top 50 error reports
561 571 """
562 572 request = get_current_request()
563 573 apps = Datastores.redis.smembers(REDIS_KEYS['apps_that_had_reports'])
564 574 Datastores.redis.delete(REDIS_KEYS['apps_that_had_reports'])
565 575 since_when = datetime.utcnow() - timedelta(hours=8)
566 576 log.warning('Generating daily digests')
567 577 for resource_id in apps:
568 578 resource_id = resource_id.decode('utf8')
569 579 end_date = datetime.utcnow().replace(microsecond=0, second=0)
570 580 filter_settings = {'resource': [resource_id],
571 581 'tags': [{'name': 'type',
572 582 'value': ['error'], 'op': None}],
573 583 'type': 'error', 'start_date': since_when,
574 584 'end_date': end_date}
575 585
576 586 reports = ReportGroupService.get_trending(
577 587 request, filter_settings=filter_settings, limit=50)
578 588
579 589 application = ApplicationService.by_id(resource_id)
580 590 if application:
581 591 users = set([p.user for p in ResourceService.users_for_perm(application, 'view')])
582 592 for user in users:
583 593 user.send_digest(request, application, reports=reports,
584 594 since_when=since_when)
585 595
586 596
587 597 @celery.task(queue="default")
588 598 def notifications_reports():
589 599 """
590 600 Loop that checks redis for info and then issues new tasks to celery to
591 601 issue notifications
592 602 """
593 603 apps = Datastores.redis.smembers(REDIS_KEYS['apps_that_had_reports'])
594 604 Datastores.redis.delete(REDIS_KEYS['apps_that_had_reports'])
595 605 for app in apps:
596 606 log.warning('Notify for app: %s' % app)
597 607 check_user_report_notifications.delay(app.decode('utf8'))
598 608
599 609 @celery.task(queue="default")
600 610 def alerting_reports():
601 611 """
602 612 Loop that checks redis for info and then issues new tasks to celery to
603 613 perform the following:
604 614 - which applications should have new alerts opened
605 615 """
606 616
607 617 apps = Datastores.redis.smembers(REDIS_KEYS['apps_that_had_reports_alerting'])
608 618 Datastores.redis.delete(REDIS_KEYS['apps_that_had_reports_alerting'])
609 619 for app in apps:
610 620 log.warning('Notify for app: %s' % app)
611 621 check_alerts.delay(app.decode('utf8'))
612 622
613 623
614 624 @celery.task(queue="default", soft_time_limit=3600 * 4,
615 625 hard_time_limit=3600 * 4, max_retries=144)
616 626 def logs_cleanup(resource_id, filter_settings):
617 627 request = get_current_request()
618 628 request.tm.begin()
619 629 es_query = {
620 630 "_source": False,
621 631 "size": 5000,
622 632 "query": {
623 633 "filtered": {
624 634 "filter": {
625 635 "and": [{"term": {"resource_id": resource_id}}]
626 636 }
627 637 }
628 638 }
629 639 }
630 640
631 641 query = DBSession.query(Log).filter(Log.resource_id == resource_id)
632 642 if filter_settings['namespace']:
633 643 query = query.filter(Log.namespace == filter_settings['namespace'][0])
634 644 es_query['query']['filtered']['filter']['and'].append(
635 645 {"term": {"namespace": filter_settings['namespace'][0]}}
636 646 )
637 647 query.delete(synchronize_session=False)
638 648 request.tm.commit()
639 649 result = request.es_conn.search(es_query, index='rcae_l_*',
640 650 doc_type='log', es_scroll='1m',
641 651 es_search_type='scan')
642 652 scroll_id = result['_scroll_id']
643 653 while True:
644 654 log.warning('log_cleanup, app:{} ns:{} batch'.format(
645 655 resource_id,
646 656 filter_settings['namespace']
647 657 ))
648 658 es_docs_to_delete = []
649 659 result = request.es_conn.send_request(
650 660 'POST', ['_search', 'scroll'],
651 661 body=scroll_id, query_params={"scroll": '1m'})
652 662 scroll_id = result['_scroll_id']
653 663 if not result['hits']['hits']:
654 664 break
655 665 for doc in result['hits']['hits']:
656 666 es_docs_to_delete.append({"id": doc['_id'],
657 667 "index": doc['_index']})
658 668
659 669 for batch in in_batches(es_docs_to_delete, 10):
660 670 Datastores.es.bulk([Datastores.es.delete_op(doc_type='log',
661 671 **to_del)
662 672 for to_del in batch])
General Comments 0
You need to be logged in to leave comments. Login now