##// END OF EJS Templates
celery: decouple report notifications from alerts
ergo -
Show More
@@ -50,6 +50,7 b" celery.user_options['preload'].add("
50 help='Specifies pyramid configuration file location.')
50 help='Specifies pyramid configuration file location.')
51 )
51 )
52
52
53
53 @user_preload_options.connect
54 @user_preload_options.connect
54 def on_preload_parsed(options, **kwargs):
55 def on_preload_parsed(options, **kwargs):
55 """
56 """
@@ -75,7 +76,7 b' def on_preload_parsed(options, **kwargs):'
75
76
76
77
77 celery_config = {
78 celery_config = {
78 'CELERY_IMPORTS': ["appenlight.celery.tasks",],
79 'CELERY_IMPORTS': ["appenlight.celery.tasks", ],
79 'CELERYD_TASK_TIME_LIMIT': 60,
80 'CELERYD_TASK_TIME_LIMIT': 60,
80 'CELERYD_MAX_TASKS_PER_CHILD': 1000,
81 'CELERYD_MAX_TASKS_PER_CHILD': 1000,
81 'CELERY_IGNORE_RESULT': True,
82 'CELERY_IGNORE_RESULT': True,
@@ -86,23 +87,37 b' celery_config = {'
86 'CELERYD_CONCURRENCY': None,
87 'CELERYD_CONCURRENCY': None,
87 'CELERY_TIMEZONE': None,
88 'CELERY_TIMEZONE': None,
88 'CELERYBEAT_SCHEDULE': {
89 'CELERYBEAT_SCHEDULE': {
89 'alerting': {
90 'alerting_reports': {
90 'task': 'appenlight.celery.tasks.alerting',
91 'task': 'appenlight.celery.tasks.alerting_reports',
91 'schedule': timedelta(seconds=60)
92 'schedule': timedelta(seconds=60)
92 },
93 },
93 'daily_digest': {
94 'close_alerts': {
94 'task': 'appenlight.celery.tasks.daily_digest',
95 'task': 'appenlight.celery.tasks.close_alerts',
95 'schedule': crontab(minute=1, hour='4,12,20')
96 'schedule': timedelta(seconds=60)
96 },
97 }
97 }
98 }
98 }
99 }
99 celery.config_from_object(celery_config)
100 celery.config_from_object(celery_config)
100
101
102
101 def configure_celery(pyramid_registry):
103 def configure_celery(pyramid_registry):
102 settings = pyramid_registry.settings
104 settings = pyramid_registry.settings
103 celery_config['BROKER_URL'] = settings['celery.broker_url']
105 celery_config['BROKER_URL'] = settings['celery.broker_url']
104 celery_config['CELERYD_CONCURRENCY'] = settings['celery.concurrency']
106 celery_config['CELERYD_CONCURRENCY'] = settings['celery.concurrency']
105 celery_config['CELERY_TIMEZONE'] = settings['celery.timezone']
107 celery_config['CELERY_TIMEZONE'] = settings['celery.timezone']
108
109 notifications_seconds = int(settings.get('tasks.notifications_reports.interval', 60))
110
111 celery_config['CELERYBEAT_SCHEDULE']['notifications'] = {
112 'task': 'appenlight.celery.tasks.notifications_reports',
113 'schedule': timedelta(seconds=notifications_seconds)
114 }
115
116 celery_config['CELERYBEAT_SCHEDULE']['daily_digest'] = {
117 'task': 'appenlight.celery.tasks.daily_digest',
118 'schedule': crontab(minute=1, hour='4,12,20')
119 }
120
106 if asbool(settings.get('celery.always_eager')):
121 if asbool(settings.get('celery.always_eager')):
107 celery_config['CELERY_ALWAYS_EAGER'] = True
122 celery_config['CELERY_ALWAYS_EAGER'] = True
108 celery_config['CELERY_EAGER_PROPAGATES_EXCEPTIONS'] = True
123 celery_config['CELERY_EAGER_PROPAGATES_EXCEPTIONS'] = True
@@ -122,13 +137,13 b' def task_prerun_signal(task_id, task, args, kwargs, **kwaargs):'
122 env = celery.pyramid
137 env = celery.pyramid
123 env = prepare(registry=env['request'].registry)
138 env = prepare(registry=env['request'].registry)
124 proper_base_url = env['request'].registry.settings['mailing.app_url']
139 proper_base_url = env['request'].registry.settings['mailing.app_url']
125 tmp_request = Request.blank('/', base_url=proper_base_url)
140 tmp_req = Request.blank('/', base_url=proper_base_url)
126 # ensure tasks generate url for right domain from config
141 # ensure tasks generate url for right domain from config
127 env['request'].environ['HTTP_HOST'] = tmp_request.environ['HTTP_HOST']
142 env['request'].environ['HTTP_HOST'] = tmp_req.environ['HTTP_HOST']
128 env['request'].environ['SERVER_PORT'] = tmp_request.environ['SERVER_PORT']
143 env['request'].environ['SERVER_PORT'] = tmp_req.environ['SERVER_PORT']
129 env['request'].environ['SERVER_NAME'] = tmp_request.environ['SERVER_NAME']
144 env['request'].environ['SERVER_NAME'] = tmp_req.environ['SERVER_NAME']
130 env['request'].environ['wsgi.url_scheme'] = tmp_request.environ[
145 env['request'].environ['wsgi.url_scheme'] = \
131 'wsgi.url_scheme']
146 tmp_req.environ['wsgi.url_scheme']
132 get_current_request().tm.begin()
147 get_current_request().tm.begin()
133
148
134
149
@@ -410,7 +410,8 b' def add_metrics_es(es_docs):'
410
410
411
411
412 @celery.task(queue="default", default_retry_delay=5, max_retries=2)
412 @celery.task(queue="default", default_retry_delay=5, max_retries=2)
413 def check_user_report_notifications(resource_id, since_when=None):
413 def check_user_report_notifications(resource_id):
414 since_when = datetime.utcnow()
414 try:
415 try:
415 request = get_current_request()
416 request = get_current_request()
416 application = ApplicationService.by_id(resource_id)
417 application = ApplicationService.by_id(resource_id)
@@ -442,14 +443,13 b' def check_user_report_notifications(resource_id, since_when=None):'
442
443
443 ApplicationService.check_for_groups_alert(
444 ApplicationService.check_for_groups_alert(
444 application, 'alert', report_groups=report_groups,
445 application, 'alert', report_groups=report_groups,
445 occurence_dict=occurence_dict, since_when=since_when)
446 occurence_dict=occurence_dict)
446 users = set([p.user for p in application.users_for_perm('view')])
447 users = set([p.user for p in application.users_for_perm('view')])
447 report_groups = report_groups.all()
448 report_groups = report_groups.all()
448 for user in users:
449 for user in users:
449 UserService.report_notify(user, request, application,
450 UserService.report_notify(user, request, application,
450 report_groups=report_groups,
451 report_groups=report_groups,
451 occurence_dict=occurence_dict,
452 occurence_dict=occurence_dict)
452 since_when=since_when)
453 for group in report_groups:
453 for group in report_groups:
454 # marks report_groups as notified
454 # marks report_groups as notified
455 if not group.notified:
455 if not group.notified:
@@ -459,9 +459,53 b' def check_user_report_notifications(resource_id, since_when=None):'
459 raise
459 raise
460
460
461
461
462 @celery.task(queue="default", default_retry_delay=5, max_retries=2)
463 def check_alerts(resource_id):
464 since_when = datetime.utcnow()
465 try:
466 request = get_current_request()
467 application = ApplicationService.by_id(resource_id)
468 if not application:
469 return
470 error_key = REDIS_KEYS[
471 'reports_to_notify_per_type_per_app_alerting'].format(
472 ReportType.error, resource_id)
473 slow_key = REDIS_KEYS[
474 'reports_to_notify_per_type_per_app_alerting'].format(
475 ReportType.slow, resource_id)
476 error_group_ids = Datastores.redis.smembers(error_key)
477 slow_group_ids = Datastores.redis.smembers(slow_key)
478 Datastores.redis.delete(error_key)
479 Datastores.redis.delete(slow_key)
480 err_gids = [int(g_id) for g_id in error_group_ids]
481 slow_gids = [int(g_id) for g_id in list(slow_group_ids)]
482 group_ids = err_gids + slow_gids
483 occurence_dict = {}
484 for g_id in group_ids:
485 key = REDIS_KEYS['counters'][
486 'report_group_occurences_alerting'].format(
487 g_id)
488 val = Datastores.redis.get(key)
489 Datastores.redis.delete(key)
490 if val:
491 occurence_dict[g_id] = int(val)
492 else:
493 occurence_dict[g_id] = 1
494 report_groups = ReportGroupService.by_ids(group_ids)
495 report_groups.options(sa.orm.joinedload(ReportGroup.last_report_ref))
496
497 ApplicationService.check_for_groups_alert(
498 application, 'alert', report_groups=report_groups,
499 occurence_dict=occurence_dict, since_when=since_when)
500 except Exception as exc:
501 print_traceback(log)
502 raise
503
504
462 @celery.task(queue="default", default_retry_delay=1, max_retries=2)
505 @celery.task(queue="default", default_retry_delay=1, max_retries=2)
463 def close_alerts(since_when=None):
506 def close_alerts():
464 log.warning('Checking alerts')
507 log.warning('Checking alerts')
508 since_when = datetime.utcnow()
465 try:
509 try:
466 event_types = [Event.types['error_report_alert'],
510 event_types = [Event.types['error_report_alert'],
467 Event.types['slow_report_alert'], ]
511 Event.types['slow_report_alert'], ]
@@ -541,26 +585,34 b' def daily_digest():'
541
585
542
586
543 @celery.task(queue="default")
587 @celery.task(queue="default")
544 def alerting():
588 def notifications_reports():
545 """
589 """
546 Loop that checks redis for info and then issues new tasks to celery to
590 Loop that checks redis for info and then issues new tasks to celery to
547 perform the following:
591 issue notifications
548 - which applications should have new alerts opened
549 - which currently opened alerts should be closed
550 """
592 """
551 start_time = datetime.utcnow()
552 # transactions are needed for mailer
553 apps = Datastores.redis.smembers(REDIS_KEYS['apps_that_had_reports'])
593 apps = Datastores.redis.smembers(REDIS_KEYS['apps_that_had_reports'])
554 Datastores.redis.delete(REDIS_KEYS['apps_that_had_reports'])
594 Datastores.redis.delete(REDIS_KEYS['apps_that_had_reports'])
555 for app in apps:
595 for app in apps:
556 log.warning('Notify for app: %s' % app)
596 log.warning('Notify for app: %s' % app)
557 check_user_report_notifications.delay(app.decode('utf8'))
597 check_user_report_notifications.delay(app.decode('utf8'))
558 # clear app ids from set
598
559 close_alerts.delay(since_when=start_time)
599 @celery.task(queue="default")
600 def alerting_reports():
601 """
602 Loop that checks redis for info and then issues new tasks to celery to
603 perform the following:
604 - which applications should have new alerts opened
605 """
606
607 apps = Datastores.redis.smembers(REDIS_KEYS['apps_that_had_reports_alerting'])
608 Datastores.redis.delete(REDIS_KEYS['apps_that_had_reports_alerting'])
609 for app in apps:
610 log.warning('Notify for app: %s' % app)
611 check_alerts.delay(app.decode('utf8'))
560
612
561
613
562 @celery.task(queue="default", soft_time_limit=3600 * 4, hard_time_limit=3600 * 4,
614 @celery.task(queue="default", soft_time_limit=3600 * 4,
563 max_retries=999)
615 hard_time_limit=3600 * 4, max_retries=999)
564 def logs_cleanup(resource_id, filter_settings):
616 def logs_cleanup(resource_id, filter_settings):
565 request = get_current_request()
617 request = get_current_request()
566 request.tm.begin()
618 request.tm.begin()
@@ -30,7 +30,7 b' from appenlight_client.exceptions import get_current_traceback'
30
30
31 def generate_random_string(chars=10):
31 def generate_random_string(chars=10):
32 return ''.join(random.sample(string.ascii_letters * 2 + string.digits,
32 return ''.join(random.sample(string.ascii_letters * 2 + string.digits,
33 chars))
33 chars))
34
34
35
35
36 def to_integer_safe(input):
36 def to_integer_safe(input):
@@ -39,6 +39,7 b' def to_integer_safe(input):'
39 except (TypeError, ValueError,):
39 except (TypeError, ValueError,):
40 return None
40 return None
41
41
42
42 def print_traceback(log):
43 def print_traceback(log):
43 traceback = get_current_traceback(skip=1, show_hidden_frames=True,
44 traceback = get_current_traceback(skip=1, show_hidden_frames=True,
44 ignore_system_exceptions=True)
45 ignore_system_exceptions=True)
@@ -47,6 +48,7 b' def print_traceback(log):'
47 log.error(traceback.plaintext)
48 log.error(traceback.plaintext)
48 del traceback
49 del traceback
49
50
51
50 def get_callable(import_string):
52 def get_callable(import_string):
51 import_module, indexer_callable = import_string.split(':')
53 import_module, indexer_callable = import_string.split(':')
52 return getattr(importlib.import_module(import_module),
54 return getattr(importlib.import_module(import_module),
@@ -38,6 +38,8 b' REDIS_KEYS = {'
38 'metrics_per_minute_per_app': BASE.format(
38 'metrics_per_minute_per_app': BASE.format(
39 'metrics_per_minute_per_app:{}:{}'),
39 'metrics_per_minute_per_app:{}:{}'),
40 'report_group_occurences': BASE.format('report_group_occurences:{}'),
40 'report_group_occurences': BASE.format('report_group_occurences:{}'),
41 'report_group_occurences_alerting': BASE.format(
42 'report_group_occurences_alerting:{}'),
41 'report_group_occurences_10th': BASE.format(
43 'report_group_occurences_10th': BASE.format(
42 'report_group_occurences_10th:{}'),
44 'report_group_occurences_10th:{}'),
43 'report_group_occurences_100th': BASE.format(
45 'report_group_occurences_100th': BASE.format(
@@ -53,7 +55,13 b' REDIS_KEYS = {'
53 },
55 },
54 'apps_that_had_reports': BASE.format('apps_that_had_reports'),
56 'apps_that_had_reports': BASE.format('apps_that_had_reports'),
55 'apps_that_had_error_reports': BASE.format('apps_that_had_error_reports'),
57 'apps_that_had_error_reports': BASE.format('apps_that_had_error_reports'),
58 'apps_that_had_reports_alerting': BASE.format(
59 'apps_that_had_reports_alerting'),
60 'apps_that_had_error_reports_alerting': BASE.format(
61 'apps_that_had_error_reports_alerting'),
56 'reports_to_notify_per_type_per_app': BASE.format(
62 'reports_to_notify_per_type_per_app': BASE.format(
57 'reports_to_notify_per_type_per_app:{}:{}'),
63 'reports_to_notify_per_type_per_app:{}:{}'),
64 'reports_to_notify_per_type_per_app_alerting': BASE.format(
65 'reports_to_notify_per_type_per_app_alerting:{}:{}'),
58 'seen_tag_list': BASE.format('seen_tag_list')
66 'seen_tag_list': BASE.format('seen_tag_list')
59 }
67 }
@@ -141,7 +141,7 b' class ReportGroup(Base, BaseModel):'
141 report_dict = report.get_dict(request)
141 report_dict = report.get_dict(request)
142 # if was not processed yet
142 # if was not processed yet
143 if (rule_obj.match(report_dict) and
143 if (rule_obj.match(report_dict) and
144 action.pkey not in self.triggered_postprocesses_ids):
144 action.pkey not in self.triggered_postprocesses_ids):
145 action.postprocess(self)
145 action.postprocess(self)
146 # this way sqla can track mutation of list
146 # this way sqla can track mutation of list
147 self.triggered_postprocesses_ids = \
147 self.triggered_postprocesses_ids = \
@@ -193,16 +193,25 b' class ReportGroup(Base, BaseModel):'
193 self.report_type, current_time)
193 self.report_type, current_time)
194 Datastores.redis.incr(key)
194 Datastores.redis.incr(key)
195 Datastores.redis.expire(key, 3600 * 24)
195 Datastores.redis.expire(key, 3600 * 24)
196 # detailed app notification
196 # detailed app notification for alerts and notifications
197 Datastores.redis.sadd(REDIS_KEYS['apps_that_had_reports'],
197 Datastores.redis.sadd(REDIS_KEYS['apps_that_had_reports'],
198 self.resource_id)
198 self.resource_id)
199 Datastores.redis.sadd(REDIS_KEYS['apps_that_had_reports_alerting'],
200 self.resource_id)
199 # only notify for exceptions here
201 # only notify for exceptions here
200 if self.report_type == ReportType.error:
202 if self.report_type == ReportType.error:
201 Datastores.redis.sadd(REDIS_KEYS['apps_that_had_reports'],
203 Datastores.redis.sadd(
202 self.resource_id)
204 REDIS_KEYS['apps_that_had_reports'],
205 self.resource_id)
206 Datastores.redis.sadd(
207 REDIS_KEYS['apps_that_had_error_reports_alerting'],
208 self.resource_id)
203 key = REDIS_KEYS['counters']['report_group_occurences'].format(self.id)
209 key = REDIS_KEYS['counters']['report_group_occurences'].format(self.id)
204 Datastores.redis.incr(key)
210 Datastores.redis.incr(key)
205 Datastores.redis.expire(key, 3600 * 24)
211 Datastores.redis.expire(key, 3600 * 24)
212 key = REDIS_KEYS['counters']['report_group_occurences_alerting'].format(self.id)
213 Datastores.redis.incr(key)
214 Datastores.redis.expire(key, 3600 * 24)
206
215
207 if notify_10:
216 if notify_10:
208 key = REDIS_KEYS['counters'][
217 key = REDIS_KEYS['counters'][
@@ -217,6 +226,10 b' class ReportGroup(Base, BaseModel):'
217 self.report_type, self.resource_id)
226 self.report_type, self.resource_id)
218 Datastores.redis.sadd(key, self.id)
227 Datastores.redis.sadd(key, self.id)
219 Datastores.redis.expire(key, 3600 * 24)
228 Datastores.redis.expire(key, 3600 * 24)
229 key = REDIS_KEYS['reports_to_notify_per_type_per_app_alerting'].format(
230 self.report_type, self.resource_id)
231 Datastores.redis.sadd(key, self.id)
232 Datastores.redis.expire(key, 3600 * 24)
220
233
221 @property
234 @property
222 def partition_id(self):
235 def partition_id(self):
@@ -117,13 +117,11 b' class UserService(BaseService):'
117
117
118 @classmethod
118 @classmethod
119 def report_notify(cls, user, request, application, report_groups,
119 def report_notify(cls, user, request, application, report_groups,
120 occurence_dict, since_when=None, db_session=None):
120 occurence_dict, db_session=None):
121 db_session = get_db_session(db_session)
121 db_session = get_db_session(db_session)
122 if not report_groups:
122 if not report_groups:
123 return True
123 return True
124
124 since_when = datetime.utcnow()
125 if not since_when:
126 since_when = datetime.utcnow()
127 for channel in cls.get_valid_channels(user):
125 for channel in cls.get_valid_channels(user):
128 confirmed_groups = []
126 confirmed_groups = []
129
127
@@ -101,6 +101,11 b' celery.broker_url = redis://localhost:6379/3'
101 celery.concurrency = 2
101 celery.concurrency = 2
102 celery.timezone = UTC
102 celery.timezone = UTC
103
103
104 # tasks
105
106 # how often run alerting tasks (60s default)
107 tasks.notifications_reports.interval = 60
108
104 [filter:paste_prefix]
109 [filter:paste_prefix]
105 use = egg:PasteDeploy#prefix
110 use = egg:PasteDeploy#prefix
106
111
@@ -108,6 +113,9 b' use = egg:PasteDeploy#prefix'
108 [filter:appenlight_client]
113 [filter:appenlight_client]
109 use = egg:appenlight_client
114 use = egg:appenlight_client
110 appenlight.api_key =
115 appenlight.api_key =
116
117 # appenlight.transport_config = http://127.0.0.1:6543?threaded=1&timeout=5&verify=0
118 # by default uses api.appenlight.com server
111 appenlight.transport_config =
119 appenlight.transport_config =
112 appenlight.report_local_vars = true
120 appenlight.report_local_vars = true
113 appenlight.report_404 = true
121 appenlight.report_404 = true
General Comments 0
You need to be logged in to leave comments. Login now