##// END OF EJS Templates
celery: decouple report notifications from alerts
ergo -
Show More
@@ -50,6 +50,7 b" celery.user_options['preload'].add("
50 50 help='Specifies pyramid configuration file location.')
51 51 )
52 52
53
53 54 @user_preload_options.connect
54 55 def on_preload_parsed(options, **kwargs):
55 56 """
@@ -86,23 +87,37 b' celery_config = {'
86 87 'CELERYD_CONCURRENCY': None,
87 88 'CELERY_TIMEZONE': None,
88 89 'CELERYBEAT_SCHEDULE': {
89 'alerting': {
90 'task': 'appenlight.celery.tasks.alerting',
90 'alerting_reports': {
91 'task': 'appenlight.celery.tasks.alerting_reports',
91 92 'schedule': timedelta(seconds=60)
92 93 },
93 'daily_digest': {
94 'task': 'appenlight.celery.tasks.daily_digest',
95 'schedule': crontab(minute=1, hour='4,12,20')
96 },
94 'close_alerts': {
95 'task': 'appenlight.celery.tasks.close_alerts',
96 'schedule': timedelta(seconds=60)
97 }
97 98 }
98 99 }
99 100 celery.config_from_object(celery_config)
100 101
102
101 103 def configure_celery(pyramid_registry):
102 104 settings = pyramid_registry.settings
103 105 celery_config['BROKER_URL'] = settings['celery.broker_url']
104 106 celery_config['CELERYD_CONCURRENCY'] = settings['celery.concurrency']
105 107 celery_config['CELERY_TIMEZONE'] = settings['celery.timezone']
108
109 notifications_seconds = int(settings.get('tasks.notifications_reports.interval', 60))
110
111 celery_config['CELERYBEAT_SCHEDULE']['notifications'] = {
112 'task': 'appenlight.celery.tasks.notifications_reports',
113 'schedule': timedelta(seconds=notifications_seconds)
114 }
115
116 celery_config['CELERYBEAT_SCHEDULE']['daily_digest'] = {
117 'task': 'appenlight.celery.tasks.daily_digest',
118 'schedule': crontab(minute=1, hour='4,12,20')
119 }
120
106 121 if asbool(settings.get('celery.always_eager')):
107 122 celery_config['CELERY_ALWAYS_EAGER'] = True
108 123 celery_config['CELERY_EAGER_PROPAGATES_EXCEPTIONS'] = True
@@ -122,13 +137,13 b' def task_prerun_signal(task_id, task, args, kwargs, **kwaargs):'
122 137 env = celery.pyramid
123 138 env = prepare(registry=env['request'].registry)
124 139 proper_base_url = env['request'].registry.settings['mailing.app_url']
125 tmp_request = Request.blank('/', base_url=proper_base_url)
140 tmp_req = Request.blank('/', base_url=proper_base_url)
126 141 # ensure tasks generate url for right domain from config
127 env['request'].environ['HTTP_HOST'] = tmp_request.environ['HTTP_HOST']
128 env['request'].environ['SERVER_PORT'] = tmp_request.environ['SERVER_PORT']
129 env['request'].environ['SERVER_NAME'] = tmp_request.environ['SERVER_NAME']
130 env['request'].environ['wsgi.url_scheme'] = tmp_request.environ[
131 'wsgi.url_scheme']
142 env['request'].environ['HTTP_HOST'] = tmp_req.environ['HTTP_HOST']
143 env['request'].environ['SERVER_PORT'] = tmp_req.environ['SERVER_PORT']
144 env['request'].environ['SERVER_NAME'] = tmp_req.environ['SERVER_NAME']
145 env['request'].environ['wsgi.url_scheme'] = \
146 tmp_req.environ['wsgi.url_scheme']
132 147 get_current_request().tm.begin()
133 148
134 149
@@ -410,7 +410,8 b' def add_metrics_es(es_docs):'
410 410
411 411
412 412 @celery.task(queue="default", default_retry_delay=5, max_retries=2)
413 def check_user_report_notifications(resource_id, since_when=None):
413 def check_user_report_notifications(resource_id):
414 since_when = datetime.utcnow()
414 415 try:
415 416 request = get_current_request()
416 417 application = ApplicationService.by_id(resource_id)
@@ -442,14 +443,13 b' def check_user_report_notifications(resource_id, since_when=None):'
442 443
443 444 ApplicationService.check_for_groups_alert(
444 445 application, 'alert', report_groups=report_groups,
445 occurence_dict=occurence_dict, since_when=since_when)
446 occurence_dict=occurence_dict)
446 447 users = set([p.user for p in application.users_for_perm('view')])
447 448 report_groups = report_groups.all()
448 449 for user in users:
449 450 UserService.report_notify(user, request, application,
450 451 report_groups=report_groups,
451 occurence_dict=occurence_dict,
452 since_when=since_when)
452 occurence_dict=occurence_dict)
453 453 for group in report_groups:
454 454 # marks report_groups as notified
455 455 if not group.notified:
@@ -459,9 +459,53 b' def check_user_report_notifications(resource_id, since_when=None):'
459 459 raise
460 460
461 461
462 @celery.task(queue="default", default_retry_delay=5, max_retries=2)
463 def check_alerts(resource_id):
464 since_when = datetime.utcnow()
465 try:
466 request = get_current_request()
467 application = ApplicationService.by_id(resource_id)
468 if not application:
469 return
470 error_key = REDIS_KEYS[
471 'reports_to_notify_per_type_per_app_alerting'].format(
472 ReportType.error, resource_id)
473 slow_key = REDIS_KEYS[
474 'reports_to_notify_per_type_per_app_alerting'].format(
475 ReportType.slow, resource_id)
476 error_group_ids = Datastores.redis.smembers(error_key)
477 slow_group_ids = Datastores.redis.smembers(slow_key)
478 Datastores.redis.delete(error_key)
479 Datastores.redis.delete(slow_key)
480 err_gids = [int(g_id) for g_id in error_group_ids]
481 slow_gids = [int(g_id) for g_id in list(slow_group_ids)]
482 group_ids = err_gids + slow_gids
483 occurence_dict = {}
484 for g_id in group_ids:
485 key = REDIS_KEYS['counters'][
486 'report_group_occurences_alerting'].format(
487 g_id)
488 val = Datastores.redis.get(key)
489 Datastores.redis.delete(key)
490 if val:
491 occurence_dict[g_id] = int(val)
492 else:
493 occurence_dict[g_id] = 1
494 report_groups = ReportGroupService.by_ids(group_ids)
495 report_groups.options(sa.orm.joinedload(ReportGroup.last_report_ref))
496
497 ApplicationService.check_for_groups_alert(
498 application, 'alert', report_groups=report_groups,
499 occurence_dict=occurence_dict, since_when=since_when)
500 except Exception as exc:
501 print_traceback(log)
502 raise
503
504
462 505 @celery.task(queue="default", default_retry_delay=1, max_retries=2)
463 def close_alerts(since_when=None):
506 def close_alerts():
464 507 log.warning('Checking alerts')
508 since_when = datetime.utcnow()
465 509 try:
466 510 event_types = [Event.types['error_report_alert'],
467 511 Event.types['slow_report_alert'], ]
@@ -541,26 +585,34 b' def daily_digest():'
541 585
542 586
543 587 @celery.task(queue="default")
544 def alerting():
588 def notifications_reports():
545 589 """
546 590 Loop that checks redis for info and then issues new tasks to celery to
547 perform the following:
548 - which applications should have new alerts opened
549 - which currently opened alerts should be closed
591 issue notifications
550 592 """
551 start_time = datetime.utcnow()
552 # transactions are needed for mailer
553 593 apps = Datastores.redis.smembers(REDIS_KEYS['apps_that_had_reports'])
554 594 Datastores.redis.delete(REDIS_KEYS['apps_that_had_reports'])
555 595 for app in apps:
556 596 log.warning('Notify for app: %s' % app)
557 597 check_user_report_notifications.delay(app.decode('utf8'))
558 # clear app ids from set
559 close_alerts.delay(since_when=start_time)
598
599 @celery.task(queue="default")
600 def alerting_reports():
601 """
602 Loop that checks redis for info and then issues new tasks to celery to
603 perform the following:
604 - which applications should have new alerts opened
605 """
606
607 apps = Datastores.redis.smembers(REDIS_KEYS['apps_that_had_reports_alerting'])
608 Datastores.redis.delete(REDIS_KEYS['apps_that_had_reports_alerting'])
609 for app in apps:
610 log.warning('Notify for app: %s' % app)
611 check_alerts.delay(app.decode('utf8'))
560 612
561 613
562 @celery.task(queue="default", soft_time_limit=3600 * 4, hard_time_limit=3600 * 4,
563 max_retries=999)
614 @celery.task(queue="default", soft_time_limit=3600 * 4,
615 hard_time_limit=3600 * 4, max_retries=999)
564 616 def logs_cleanup(resource_id, filter_settings):
565 617 request = get_current_request()
566 618 request.tm.begin()
@@ -39,6 +39,7 b' def to_integer_safe(input):'
39 39 except (TypeError, ValueError,):
40 40 return None
41 41
42
42 43 def print_traceback(log):
43 44 traceback = get_current_traceback(skip=1, show_hidden_frames=True,
44 45 ignore_system_exceptions=True)
@@ -47,6 +48,7 b' def print_traceback(log):'
47 48 log.error(traceback.plaintext)
48 49 del traceback
49 50
51
50 52 def get_callable(import_string):
51 53 import_module, indexer_callable = import_string.split(':')
52 54 return getattr(importlib.import_module(import_module),
@@ -38,6 +38,8 b' REDIS_KEYS = {'
38 38 'metrics_per_minute_per_app': BASE.format(
39 39 'metrics_per_minute_per_app:{}:{}'),
40 40 'report_group_occurences': BASE.format('report_group_occurences:{}'),
41 'report_group_occurences_alerting': BASE.format(
42 'report_group_occurences_alerting:{}'),
41 43 'report_group_occurences_10th': BASE.format(
42 44 'report_group_occurences_10th:{}'),
43 45 'report_group_occurences_100th': BASE.format(
@@ -53,7 +55,13 b' REDIS_KEYS = {'
53 55 },
54 56 'apps_that_had_reports': BASE.format('apps_that_had_reports'),
55 57 'apps_that_had_error_reports': BASE.format('apps_that_had_error_reports'),
58 'apps_that_had_reports_alerting': BASE.format(
59 'apps_that_had_reports_alerting'),
60 'apps_that_had_error_reports_alerting': BASE.format(
61 'apps_that_had_error_reports_alerting'),
56 62 'reports_to_notify_per_type_per_app': BASE.format(
57 63 'reports_to_notify_per_type_per_app:{}:{}'),
64 'reports_to_notify_per_type_per_app_alerting': BASE.format(
65 'reports_to_notify_per_type_per_app_alerting:{}:{}'),
58 66 'seen_tag_list': BASE.format('seen_tag_list')
59 67 }
@@ -193,16 +193,25 b' class ReportGroup(Base, BaseModel):'
193 193 self.report_type, current_time)
194 194 Datastores.redis.incr(key)
195 195 Datastores.redis.expire(key, 3600 * 24)
196 # detailed app notification
196 # detailed app notification for alerts and notifications
197 197 Datastores.redis.sadd(REDIS_KEYS['apps_that_had_reports'],
198 198 self.resource_id)
199 Datastores.redis.sadd(REDIS_KEYS['apps_that_had_reports_alerting'],
200 self.resource_id)
199 201 # only notify for exceptions here
200 202 if self.report_type == ReportType.error:
201 Datastores.redis.sadd(REDIS_KEYS['apps_that_had_reports'],
203 Datastores.redis.sadd(
204 REDIS_KEYS['apps_that_had_reports'],
205 self.resource_id)
206 Datastores.redis.sadd(
207 REDIS_KEYS['apps_that_had_error_reports_alerting'],
202 208 self.resource_id)
203 209 key = REDIS_KEYS['counters']['report_group_occurences'].format(self.id)
204 210 Datastores.redis.incr(key)
205 211 Datastores.redis.expire(key, 3600 * 24)
212 key = REDIS_KEYS['counters']['report_group_occurences_alerting'].format(self.id)
213 Datastores.redis.incr(key)
214 Datastores.redis.expire(key, 3600 * 24)
206 215
207 216 if notify_10:
208 217 key = REDIS_KEYS['counters'][
@@ -217,6 +226,10 b' class ReportGroup(Base, BaseModel):'
217 226 self.report_type, self.resource_id)
218 227 Datastores.redis.sadd(key, self.id)
219 228 Datastores.redis.expire(key, 3600 * 24)
229 key = REDIS_KEYS['reports_to_notify_per_type_per_app_alerting'].format(
230 self.report_type, self.resource_id)
231 Datastores.redis.sadd(key, self.id)
232 Datastores.redis.expire(key, 3600 * 24)
220 233
221 234 @property
222 235 def partition_id(self):
@@ -117,12 +117,10 b' class UserService(BaseService):'
117 117
118 118 @classmethod
119 119 def report_notify(cls, user, request, application, report_groups,
120 occurence_dict, since_when=None, db_session=None):
120 occurence_dict, db_session=None):
121 121 db_session = get_db_session(db_session)
122 122 if not report_groups:
123 123 return True
124
125 if not since_when:
126 124 since_when = datetime.utcnow()
127 125 for channel in cls.get_valid_channels(user):
128 126 confirmed_groups = []
@@ -101,6 +101,11 b' celery.broker_url = redis://localhost:6379/3'
101 101 celery.concurrency = 2
102 102 celery.timezone = UTC
103 103
104 # tasks
105
106 # how often run alerting tasks (60s default)
107 tasks.notifications_reports.interval = 60
108
104 109 [filter:paste_prefix]
105 110 use = egg:PasteDeploy#prefix
106 111
@@ -108,6 +113,9 b' use = egg:PasteDeploy#prefix'
108 113 [filter:appenlight_client]
109 114 use = egg:appenlight_client
110 115 appenlight.api_key =
116
117 # appenlight.transport_config = http://127.0.0.1:6543?threaded=1&timeout=5&verify=0
118 # by default uses api.appenlight.com server
111 119 appenlight.transport_config =
112 120 appenlight.report_local_vars = true
113 121 appenlight.report_404 = true
General Comments 0
You need to be logged in to leave comments. Login now