From 97f5bf911c41f03b4876120cb9e4626425bd66b7 2016-06-21 16:20:49 From: Marcin Lulek Date: 2016-06-21 16:20:49 Subject: [PATCH] celery: decouple report notifications from alerts --- diff --git a/backend/src/appenlight/celery/__init__.py b/backend/src/appenlight/celery/__init__.py index 04bde68..385d8d3 100644 --- a/backend/src/appenlight/celery/__init__.py +++ b/backend/src/appenlight/celery/__init__.py @@ -50,6 +50,7 @@ celery.user_options['preload'].add( help='Specifies pyramid configuration file location.') ) + @user_preload_options.connect def on_preload_parsed(options, **kwargs): """ @@ -75,7 +76,7 @@ def on_preload_parsed(options, **kwargs): celery_config = { - 'CELERY_IMPORTS': ["appenlight.celery.tasks",], + 'CELERY_IMPORTS': ["appenlight.celery.tasks", ], 'CELERYD_TASK_TIME_LIMIT': 60, 'CELERYD_MAX_TASKS_PER_CHILD': 1000, 'CELERY_IGNORE_RESULT': True, @@ -86,23 +87,37 @@ celery_config = { 'CELERYD_CONCURRENCY': None, 'CELERY_TIMEZONE': None, 'CELERYBEAT_SCHEDULE': { - 'alerting': { - 'task': 'appenlight.celery.tasks.alerting', + 'alerting_reports': { + 'task': 'appenlight.celery.tasks.alerting_reports', 'schedule': timedelta(seconds=60) }, - 'daily_digest': { - 'task': 'appenlight.celery.tasks.daily_digest', - 'schedule': crontab(minute=1, hour='4,12,20') - }, + 'close_alerts': { + 'task': 'appenlight.celery.tasks.close_alerts', + 'schedule': timedelta(seconds=60) + } } } celery.config_from_object(celery_config) + def configure_celery(pyramid_registry): settings = pyramid_registry.settings celery_config['BROKER_URL'] = settings['celery.broker_url'] celery_config['CELERYD_CONCURRENCY'] = settings['celery.concurrency'] celery_config['CELERY_TIMEZONE'] = settings['celery.timezone'] + + notifications_seconds = int(settings.get('tasks.notifications_reports.interval', 60)) + + celery_config['CELERYBEAT_SCHEDULE']['notifications'] = { + 'task': 'appenlight.celery.tasks.notifications_reports', + 'schedule': timedelta(seconds=notifications_seconds) + } + + celery_config['CELERYBEAT_SCHEDULE']['daily_digest'] = { + 'task': 'appenlight.celery.tasks.daily_digest', + 'schedule': crontab(minute=1, hour='4,12,20') + } + if asbool(settings.get('celery.always_eager')): celery_config['CELERY_ALWAYS_EAGER'] = True celery_config['CELERY_EAGER_PROPAGATES_EXCEPTIONS'] = True @@ -122,13 +137,13 @@ def task_prerun_signal(task_id, task, args, kwargs, **kwaargs): env = celery.pyramid env = prepare(registry=env['request'].registry) proper_base_url = env['request'].registry.settings['mailing.app_url'] - tmp_request = Request.blank('/', base_url=proper_base_url) + tmp_req = Request.blank('/', base_url=proper_base_url) # ensure tasks generate url for right domain from config - env['request'].environ['HTTP_HOST'] = tmp_request.environ['HTTP_HOST'] - env['request'].environ['SERVER_PORT'] = tmp_request.environ['SERVER_PORT'] - env['request'].environ['SERVER_NAME'] = tmp_request.environ['SERVER_NAME'] - env['request'].environ['wsgi.url_scheme'] = tmp_request.environ[ - 'wsgi.url_scheme'] + env['request'].environ['HTTP_HOST'] = tmp_req.environ['HTTP_HOST'] + env['request'].environ['SERVER_PORT'] = tmp_req.environ['SERVER_PORT'] + env['request'].environ['SERVER_NAME'] = tmp_req.environ['SERVER_NAME'] + env['request'].environ['wsgi.url_scheme'] = \ + tmp_req.environ['wsgi.url_scheme'] get_current_request().tm.begin() diff --git a/backend/src/appenlight/celery/tasks.py b/backend/src/appenlight/celery/tasks.py index 3ce5dbe..ffbfbb5 100644 --- a/backend/src/appenlight/celery/tasks.py +++ b/backend/src/appenlight/celery/tasks.py @@ -410,7 +410,8 @@ def add_metrics_es(es_docs): @celery.task(queue="default", default_retry_delay=5, max_retries=2) -def check_user_report_notifications(resource_id, since_when=None): +def check_user_report_notifications(resource_id): + since_when = datetime.utcnow() try: request = get_current_request() application = ApplicationService.by_id(resource_id) @@ -442,14 +443,13 @@ def check_user_report_notifications(resource_id, since_when=None): ApplicationService.check_for_groups_alert( application, 'alert', report_groups=report_groups, - occurence_dict=occurence_dict, since_when=since_when) + occurence_dict=occurence_dict) users = set([p.user for p in application.users_for_perm('view')]) report_groups = report_groups.all() for user in users: UserService.report_notify(user, request, application, report_groups=report_groups, - occurence_dict=occurence_dict, - since_when=since_when) + occurence_dict=occurence_dict) for group in report_groups: # marks report_groups as notified if not group.notified: @@ -459,9 +459,53 @@ def check_user_report_notifications(resource_id, since_when=None): raise +@celery.task(queue="default", default_retry_delay=5, max_retries=2) +def check_alerts(resource_id): + since_when = datetime.utcnow() + try: + request = get_current_request() + application = ApplicationService.by_id(resource_id) + if not application: + return + error_key = REDIS_KEYS[ + 'reports_to_notify_per_type_per_app_alerting'].format( + ReportType.error, resource_id) + slow_key = REDIS_KEYS[ + 'reports_to_notify_per_type_per_app_alerting'].format( + ReportType.slow, resource_id) + error_group_ids = Datastores.redis.smembers(error_key) + slow_group_ids = Datastores.redis.smembers(slow_key) + Datastores.redis.delete(error_key) + Datastores.redis.delete(slow_key) + err_gids = [int(g_id) for g_id in error_group_ids] + slow_gids = [int(g_id) for g_id in list(slow_group_ids)] + group_ids = err_gids + slow_gids + occurence_dict = {} + for g_id in group_ids: + key = REDIS_KEYS['counters'][ + 'report_group_occurences_alerting'].format( + g_id) + val = Datastores.redis.get(key) + Datastores.redis.delete(key) + if val: + occurence_dict[g_id] = int(val) + else: + occurence_dict[g_id] = 1 + report_groups = ReportGroupService.by_ids(group_ids) + report_groups.options(sa.orm.joinedload(ReportGroup.last_report_ref)) + + ApplicationService.check_for_groups_alert( + application, 'alert', report_groups=report_groups, + occurence_dict=occurence_dict, since_when=since_when) + except Exception as exc: + print_traceback(log) + raise + + @celery.task(queue="default", default_retry_delay=1, max_retries=2) -def close_alerts(since_when=None): +def close_alerts(): log.warning('Checking alerts') + since_when = datetime.utcnow() try: event_types = [Event.types['error_report_alert'], Event.types['slow_report_alert'], ] @@ -541,26 +585,34 @@ def daily_digest(): @celery.task(queue="default") -def alerting(): +def notifications_reports(): """ Loop that checks redis for info and then issues new tasks to celery to - perform the following: - - which applications should have new alerts opened - - which currently opened alerts should be closed + issue notifications """ - start_time = datetime.utcnow() - # transactions are needed for mailer apps = Datastores.redis.smembers(REDIS_KEYS['apps_that_had_reports']) Datastores.redis.delete(REDIS_KEYS['apps_that_had_reports']) for app in apps: log.warning('Notify for app: %s' % app) check_user_report_notifications.delay(app.decode('utf8')) - # clear app ids from set - close_alerts.delay(since_when=start_time) + +@celery.task(queue="default") +def alerting_reports(): + """ + Loop that checks redis for info and then issues new tasks to celery to + perform the following: + - which applications should have new alerts opened + """ + + apps = Datastores.redis.smembers(REDIS_KEYS['apps_that_had_reports_alerting']) + Datastores.redis.delete(REDIS_KEYS['apps_that_had_reports_alerting']) + for app in apps: + log.warning('Notify for app: %s' % app) + check_alerts.delay(app.decode('utf8')) -@celery.task(queue="default", soft_time_limit=3600 * 4, hard_time_limit=3600 * 4, - max_retries=999) +@celery.task(queue="default", soft_time_limit=3600 * 4, + hard_time_limit=3600 * 4, max_retries=999) def logs_cleanup(resource_id, filter_settings): request = get_current_request() request.tm.begin() diff --git a/backend/src/appenlight/lib/__init__.py b/backend/src/appenlight/lib/__init__.py index 27c5848..7e00317 100644 --- a/backend/src/appenlight/lib/__init__.py +++ b/backend/src/appenlight/lib/__init__.py @@ -30,7 +30,7 @@ from appenlight_client.exceptions import get_current_traceback def generate_random_string(chars=10): return ''.join(random.sample(string.ascii_letters * 2 + string.digits, - chars)) + chars)) def to_integer_safe(input): @@ -39,6 +39,7 @@ def to_integer_safe(input): except (TypeError, ValueError,): return None + def print_traceback(log): traceback = get_current_traceback(skip=1, show_hidden_frames=True, ignore_system_exceptions=True) @@ -47,6 +48,7 @@ def print_traceback(log): log.error(traceback.plaintext) del traceback + def get_callable(import_string): import_module, indexer_callable = import_string.split(':') return getattr(importlib.import_module(import_module), diff --git a/backend/src/appenlight/lib/redis_keys.py b/backend/src/appenlight/lib/redis_keys.py index de0e2a3..45c6fcb 100644 --- a/backend/src/appenlight/lib/redis_keys.py +++ b/backend/src/appenlight/lib/redis_keys.py @@ -38,6 +38,8 @@ REDIS_KEYS = { 'metrics_per_minute_per_app': BASE.format( 'metrics_per_minute_per_app:{}:{}'), 'report_group_occurences': BASE.format('report_group_occurences:{}'), + 'report_group_occurences_alerting': BASE.format( + 'report_group_occurences_alerting:{}'), 'report_group_occurences_10th': BASE.format( 'report_group_occurences_10th:{}'), 'report_group_occurences_100th': BASE.format( @@ -53,7 +55,13 @@ REDIS_KEYS = { }, 'apps_that_had_reports': BASE.format('apps_that_had_reports'), 'apps_that_had_error_reports': BASE.format('apps_that_had_error_reports'), + 'apps_that_had_reports_alerting': BASE.format( + 'apps_that_had_reports_alerting'), + 'apps_that_had_error_reports_alerting': BASE.format( + 'apps_that_had_error_reports_alerting'), 'reports_to_notify_per_type_per_app': BASE.format( 'reports_to_notify_per_type_per_app:{}:{}'), + 'reports_to_notify_per_type_per_app_alerting': BASE.format( + 'reports_to_notify_per_type_per_app_alerting:{}:{}'), 'seen_tag_list': BASE.format('seen_tag_list') } diff --git a/backend/src/appenlight/models/report_group.py b/backend/src/appenlight/models/report_group.py index c19bffa..7d08248 100644 --- a/backend/src/appenlight/models/report_group.py +++ b/backend/src/appenlight/models/report_group.py @@ -141,7 +141,7 @@ class ReportGroup(Base, BaseModel): report_dict = report.get_dict(request) # if was not processed yet if (rule_obj.match(report_dict) and - action.pkey not in self.triggered_postprocesses_ids): + action.pkey not in self.triggered_postprocesses_ids): action.postprocess(self) # this way sqla can track mutation of list self.triggered_postprocesses_ids = \ @@ -193,16 +193,25 @@ class ReportGroup(Base, BaseModel): self.report_type, current_time) Datastores.redis.incr(key) Datastores.redis.expire(key, 3600 * 24) - # detailed app notification + # detailed app notification for alerts and notifications Datastores.redis.sadd(REDIS_KEYS['apps_that_had_reports'], self.resource_id) + Datastores.redis.sadd(REDIS_KEYS['apps_that_had_reports_alerting'], + self.resource_id) # only notify for exceptions here if self.report_type == ReportType.error: - Datastores.redis.sadd(REDIS_KEYS['apps_that_had_reports'], - self.resource_id) + Datastores.redis.sadd( + REDIS_KEYS['apps_that_had_reports'], + self.resource_id) + Datastores.redis.sadd( + REDIS_KEYS['apps_that_had_error_reports_alerting'], + self.resource_id) key = REDIS_KEYS['counters']['report_group_occurences'].format(self.id) Datastores.redis.incr(key) Datastores.redis.expire(key, 3600 * 24) + key = REDIS_KEYS['counters']['report_group_occurences_alerting'].format(self.id) + Datastores.redis.incr(key) + Datastores.redis.expire(key, 3600 * 24) if notify_10: key = REDIS_KEYS['counters'][ @@ -217,6 +226,10 @@ class ReportGroup(Base, BaseModel): self.report_type, self.resource_id) Datastores.redis.sadd(key, self.id) Datastores.redis.expire(key, 3600 * 24) + key = REDIS_KEYS['reports_to_notify_per_type_per_app_alerting'].format( + self.report_type, self.resource_id) + Datastores.redis.sadd(key, self.id) + Datastores.redis.expire(key, 3600 * 24) @property def partition_id(self): diff --git a/backend/src/appenlight/models/services/user.py b/backend/src/appenlight/models/services/user.py index 6b0ee62..186a01e 100644 --- a/backend/src/appenlight/models/services/user.py +++ b/backend/src/appenlight/models/services/user.py @@ -117,13 +117,11 @@ class UserService(BaseService): @classmethod def report_notify(cls, user, request, application, report_groups, - occurence_dict, since_when=None, db_session=None): + occurence_dict, db_session=None): db_session = get_db_session(db_session) if not report_groups: return True - - if not since_when: - since_when = datetime.utcnow() + since_when = datetime.utcnow() for channel in cls.get_valid_channels(user): confirmed_groups = [] diff --git a/backend/src/appenlight/templates/ini/production.ini.jinja2 b/backend/src/appenlight/templates/ini/production.ini.jinja2 index 68320a8..5d5e34e 100644 --- a/backend/src/appenlight/templates/ini/production.ini.jinja2 +++ b/backend/src/appenlight/templates/ini/production.ini.jinja2 @@ -101,6 +101,11 @@ celery.broker_url = redis://localhost:6379/3 celery.concurrency = 2 celery.timezone = UTC +# tasks + +# how often run alerting tasks (60s default) +tasks.notifications_reports.interval = 60 + [filter:paste_prefix] use = egg:PasteDeploy#prefix @@ -108,6 +113,9 @@ use = egg:PasteDeploy#prefix [filter:appenlight_client] use = egg:appenlight_client appenlight.api_key = + +# appenlight.transport_config = http://127.0.0.1:6543?threaded=1&timeout=5&verify=0 +# by default uses api.appenlight.com server appenlight.transport_config = appenlight.report_local_vars = true appenlight.report_404 = true