From 86095bc85f51dcf8e746896499f16dc29a0e9454 2016-10-11 10:49:03 From: Marcin Lulek Date: 2016-10-11 10:49:03 Subject: [PATCH] redis: some cleanups and use of pipelines for better performance --- diff --git a/backend/src/appenlight/celery/tasks.py b/backend/src/appenlight/celery/tasks.py index 10f073b..3da7927 100644 --- a/backend/src/appenlight/celery/tasks.py +++ b/backend/src/appenlight/celery/tasks.py @@ -91,8 +91,8 @@ def test_retry_exception_task(): @celery.task(queue="reports", default_retry_delay=600, max_retries=144) -def add_reports(resource_id, params, dataset, environ=None, **kwargs): - proto_version = parse_proto(params.get('protocol_version', '')) +def add_reports(resource_id, request_params, dataset, **kwargs): + proto_version = parse_proto(request_params.get('protocol_version', '')) current_time = datetime.utcnow().replace(second=0, microsecond=0) try: # we will store solr docs here for single insert @@ -194,13 +194,18 @@ def add_reports(resource_id, params, dataset, environ=None, **kwargs): proto_version) log.info(log_msg) total_reports = len(dataset) + redis_pipeline = Datastores.redis.pipeline(transaction=False) key = REDIS_KEYS['counters']['reports_per_minute'].format(current_time) - Datastores.redis.incr(key, total_reports) - Datastores.redis.expire(key, 3600 * 24) - key = REDIS_KEYS['counters']['reports_per_minute_per_app'].format( - resource_id, current_time) - Datastores.redis.incr(key, total_reports) - Datastores.redis.expire(key, 3600 * 24) + redis_pipeline.incr(key, total_reports) + redis_pipeline.expire(key, 3600 * 24) + key = REDIS_KEYS['counters']['reports_per_hour_per_app'].format( + resource_id, current_time.replace(minute=0)) + redis_pipeline.incr(key, total_reports) + redis_pipeline.expire(key, 3600 * 24 * 7) + redis_pipeline.sadd( + REDIS_KEYS['apps_that_got_new_data_per_hour'], + resource_id, current_time.replace(minute=0)) + redis_pipeline.execute() add_reports_es(es_report_group_docs, es_report_docs) add_reports_slow_calls_es(es_slow_calls_docs) @@ -233,8 +238,8 @@ def add_reports_stats_rows_es(es_docs): @celery.task(queue="logs", default_retry_delay=600, max_retries=144) -def add_logs(resource_id, request, dataset, environ=None, **kwargs): - proto_version = request.get('protocol_version') +def add_logs(resource_id, request_params, dataset, **kwargs): + proto_version = request_params.get('protocol_version') current_time = datetime.utcnow().replace(second=0, microsecond=0) try: @@ -308,13 +313,18 @@ def add_logs(resource_id, request, dataset, environ=None, **kwargs): proto_version) log.info(log_msg) # mark_changed(session) + redis_pipeline = Datastores.redis.pipeline(transaction=False) key = REDIS_KEYS['counters']['logs_per_minute'].format(current_time) - Datastores.redis.incr(key, total_logs) - Datastores.redis.expire(key, 3600 * 24) - key = REDIS_KEYS['counters']['logs_per_minute_per_app'].format( - resource_id, current_time) - Datastores.redis.incr(key, total_logs) - Datastores.redis.expire(key, 3600 * 24) + redis_pipeline.incr(key, total_logs) + redis_pipeline.expire(key, 3600 * 24) + key = REDIS_KEYS['counters']['logs_per_hour_per_app'].format( + resource_id, current_time.replace(minute=0)) + redis_pipeline.incr(key, total_logs) + redis_pipeline.expire(key, 3600 * 24 * 7) + redis_pipeline.sadd( + REDIS_KEYS['apps_that_got_new_data_per_hour'], + resource_id, current_time.replace(minute=0)) + redis_pipeline.execute() add_logs_es(es_docs) return True except Exception as exc: @@ -329,7 +339,7 @@ def add_logs_es(es_docs): @celery.task(queue="metrics", default_retry_delay=600, max_retries=144) -def add_metrics(resource_id, request, dataset, proto_version): +def add_metrics(resource_id, request_params, dataset, proto_version): current_time = datetime.utcnow().replace(second=0, microsecond=0) try: application = ApplicationService.by_id_cached()(resource_id) @@ -361,13 +371,18 @@ def add_metrics(resource_id, request, dataset, proto_version): log.info(metrics_msg) mark_changed(session) + redis_pipeline = Datastores.redis.pipeline(transaction=False) key = REDIS_KEYS['counters']['metrics_per_minute'].format(current_time) - Datastores.redis.incr(key, len(rows)) - Datastores.redis.expire(key, 3600 * 24) - key = REDIS_KEYS['counters']['metrics_per_minute_per_app'].format( - resource_id, current_time) - Datastores.redis.incr(key, len(rows)) - Datastores.redis.expire(key, 3600 * 24) + redis_pipeline.incr(key, len(rows)) + redis_pipeline.expire(key, 3600 * 24) + key = REDIS_KEYS['counters']['metrics_per_hour_per_app'].format( + resource_id, current_time.replace(minute=0)) + redis_pipeline.incr(key, len(rows)) + redis_pipeline.expire(key, 3600 * 24 * 7) + redis_pipeline.sadd( + REDIS_KEYS['apps_that_got_new_data_per_hour'], + resource_id, current_time.replace(minute=0)) + redis_pipeline.execute() add_metrics_es(es_docs) return True except Exception as exc: diff --git a/backend/src/appenlight/lib/api.py b/backend/src/appenlight/lib/api.py index 00c3320..f7d7dd8 100644 --- a/backend/src/appenlight/lib/api.py +++ b/backend/src/appenlight/lib/api.py @@ -35,8 +35,11 @@ def rate_limiting(request, resource, section, to_increment=1): tsample = datetime.datetime.utcnow().replace(second=0, microsecond=0) key = REDIS_KEYS['rate_limits'][section].format(tsample, resource.resource_id) - current_count = Datastores.redis.incr(key, to_increment) - Datastores.redis.expire(key, 3600 * 24) + redis_pipeline = request.registry.redis_conn.pipeline() + redis_pipeline.incr(key, to_increment) + redis_pipeline.expire(key, 3600 * 24) + results = redis_pipeline.execute() + current_count = results[0] config = ConfigService.by_key_and_section(section, 'global') limit = config.value if config else 1000 if current_count > int(limit): diff --git a/backend/src/appenlight/lib/redis_keys.py b/backend/src/appenlight/lib/redis_keys.py index 6e0442f..9adb537 100644 --- a/backend/src/appenlight/lib/redis_keys.py +++ b/backend/src/appenlight/lib/redis_keys.py @@ -28,15 +28,15 @@ REDIS_KEYS = { }, 'counters': { 'reports_per_minute': BASE.format('reports_per_minute:{}'), - 'reports_per_minute_per_app': BASE.format( - 'reports_per_minute_per_app:{}:{}'), + 'reports_per_hour_per_app': BASE.format( + 'reports_per_hour_per_app:{}:{}'), 'reports_per_type': BASE.format('reports_per_type:{}'), 'logs_per_minute': BASE.format('logs_per_minute:{}'), - 'logs_per_minute_per_app': BASE.format( - 'logs_per_minute_per_app:{}:{}'), + 'logs_per_hour_per_app': BASE.format( + 'logs_per_hour_per_app:{}:{}'), 'metrics_per_minute': BASE.format('metrics_per_minute:{}'), - 'metrics_per_minute_per_app': BASE.format( - 'metrics_per_minute_per_app:{}:{}'), + 'metrics_per_hour_per_app': BASE.format( + 'metrics_per_hour_per_app:{}:{}'), 'report_group_occurences': BASE.format('report_group_occurences:{}'), 'report_group_occurences_alerting': BASE.format( 'report_group_occurences_alerting:{}'), @@ -53,6 +53,7 @@ REDIS_KEYS = { 'per_application_metrics_rate_limit': BASE.format( 'per_application_metrics_rate_limit:{}:{}'), }, + 'apps_that_got_new_data_per_hour': BASE.format('apps_that_got_new_data_per_hour'), 'apps_that_had_reports': BASE.format('apps_that_had_reports'), 'apps_that_had_error_reports': BASE.format('apps_that_had_error_reports'), 'apps_that_had_reports_alerting': BASE.format( diff --git a/backend/src/appenlight/models/report_group.py b/backend/src/appenlight/models/report_group.py index 5de223b..1254899 100644 --- a/backend/src/appenlight/models/report_group.py +++ b/backend/src/appenlight/models/report_group.py @@ -191,45 +191,46 @@ class ReportGroup(Base, BaseModel): # global app counter key = REDIS_KEYS['counters']['reports_per_type'].format( self.report_type, current_time) - Datastores.redis.incr(key) - Datastores.redis.expire(key, 3600 * 24) + redis_pipeline = Datastores.redis.pipeline() + redis_pipeline.incr(key) + redis_pipeline.expire(key, 3600 * 24) # detailed app notification for alerts and notifications - Datastores.redis.sadd(REDIS_KEYS['apps_that_had_reports'], - self.resource_id) - Datastores.redis.sadd(REDIS_KEYS['apps_that_had_reports_alerting'], - self.resource_id) + redis_pipeline.sadd( + REDIS_KEYS['apps_that_had_reports'], self.resource_id) + redis_pipeline.sadd( + REDIS_KEYS['apps_that_had_reports_alerting'], self.resource_id) # only notify for exceptions here if self.report_type == ReportType.error: - Datastores.redis.sadd( - REDIS_KEYS['apps_that_had_reports'], - self.resource_id) - Datastores.redis.sadd( + redis_pipeline.sadd( + REDIS_KEYS['apps_that_had_reports'], self.resource_id) + redis_pipeline.sadd( REDIS_KEYS['apps_that_had_error_reports_alerting'], self.resource_id) key = REDIS_KEYS['counters']['report_group_occurences'].format(self.id) - Datastores.redis.incr(key) - Datastores.redis.expire(key, 3600 * 24) + redis_pipeline.incr(key) + redis_pipeline.expire(key, 3600 * 24) key = REDIS_KEYS['counters']['report_group_occurences_alerting'].format(self.id) - Datastores.redis.incr(key) - Datastores.redis.expire(key, 3600 * 24) + redis_pipeline.incr(key) + redis_pipeline.expire(key, 3600 * 24) if notify_10: key = REDIS_KEYS['counters'][ 'report_group_occurences_10th'].format(self.id) - Datastores.redis.setex(key, 3600 * 24, 1) + redis_pipeline.setex(key, 3600 * 24, 1) if notify_100: key = REDIS_KEYS['counters'][ 'report_group_occurences_100th'].format(self.id) - Datastores.redis.setex(key, 3600 * 24, 1) + redis_pipeline.setex(key, 3600 * 24, 1) key = REDIS_KEYS['reports_to_notify_per_type_per_app'].format( self.report_type, self.resource_id) - Datastores.redis.sadd(key, self.id) - Datastores.redis.expire(key, 3600 * 24) + redis_pipeline.sadd(key, self.id) + redis_pipeline.expire(key, 3600 * 24) key = REDIS_KEYS['reports_to_notify_per_type_per_app_alerting'].format( self.report_type, self.resource_id) - Datastores.redis.sadd(key, self.id) - Datastores.redis.expire(key, 3600 * 24) + redis_pipeline.sadd(key, self.id) + redis_pipeline.expire(key, 3600 * 24) + redis_pipeline.execute() @property def partition_id(self): diff --git a/backend/src/appenlight/views/admin/admin.py b/backend/src/appenlight/views/admin/admin.py index 193ca34..c66fd72 100644 --- a/backend/src/appenlight/views/admin/admin.py +++ b/backend/src/appenlight/views/admin/admin.py @@ -56,14 +56,13 @@ def system(request): current_time = datetime.utcnow(). \ replace(second=0, microsecond=0) - timedelta(minutes=1) # global app counter - - processed_reports = Datastores.redis.get( + processed_reports = request.registry.redis_conn.get( REDIS_KEYS['counters']['reports_per_minute'].format(current_time)) processed_reports = int(processed_reports) if processed_reports else 0 - processed_logs = Datastores.redis.get( + processed_logs = request.registry.redis_conn.get( REDIS_KEYS['counters']['logs_per_minute'].format(current_time)) processed_logs = int(processed_logs) if processed_logs else 0 - processed_metrics = Datastores.redis.get( + processed_metrics = request.registry.redis_conn.get( REDIS_KEYS['counters']['metrics_per_minute'].format(current_time)) processed_metrics = int(processed_metrics) if processed_metrics else 0