##// END OF EJS Templates
redis: some cleanups and use of pipelines for better performance
ergo -
Show More
@@ -91,8 +91,8 b' def test_retry_exception_task():'
91
91
92
92
93 @celery.task(queue="reports", default_retry_delay=600, max_retries=144)
93 @celery.task(queue="reports", default_retry_delay=600, max_retries=144)
94 def add_reports(resource_id, params, dataset, environ=None, **kwargs):
94 def add_reports(resource_id, request_params, dataset, **kwargs):
95 proto_version = parse_proto(params.get('protocol_version', ''))
95 proto_version = parse_proto(request_params.get('protocol_version', ''))
96 current_time = datetime.utcnow().replace(second=0, microsecond=0)
96 current_time = datetime.utcnow().replace(second=0, microsecond=0)
97 try:
97 try:
98 # we will store solr docs here for single insert
98 # we will store solr docs here for single insert
@@ -194,13 +194,18 b' def add_reports(resource_id, params, dataset, environ=None, **kwargs):'
194 proto_version)
194 proto_version)
195 log.info(log_msg)
195 log.info(log_msg)
196 total_reports = len(dataset)
196 total_reports = len(dataset)
197 redis_pipeline = Datastores.redis.pipeline(transaction=False)
197 key = REDIS_KEYS['counters']['reports_per_minute'].format(current_time)
198 key = REDIS_KEYS['counters']['reports_per_minute'].format(current_time)
198 Datastores.redis.incr(key, total_reports)
199 redis_pipeline.incr(key, total_reports)
199 Datastores.redis.expire(key, 3600 * 24)
200 redis_pipeline.expire(key, 3600 * 24)
200 key = REDIS_KEYS['counters']['reports_per_minute_per_app'].format(
201 key = REDIS_KEYS['counters']['reports_per_hour_per_app'].format(
201 resource_id, current_time)
202 resource_id, current_time.replace(minute=0))
202 Datastores.redis.incr(key, total_reports)
203 redis_pipeline.incr(key, total_reports)
203 Datastores.redis.expire(key, 3600 * 24)
204 redis_pipeline.expire(key, 3600 * 24 * 7)
205 redis_pipeline.sadd(
206 REDIS_KEYS['apps_that_got_new_data_per_hour'],
207 resource_id, current_time.replace(minute=0))
208 redis_pipeline.execute()
204
209
205 add_reports_es(es_report_group_docs, es_report_docs)
210 add_reports_es(es_report_group_docs, es_report_docs)
206 add_reports_slow_calls_es(es_slow_calls_docs)
211 add_reports_slow_calls_es(es_slow_calls_docs)
@@ -233,8 +238,8 b' def add_reports_stats_rows_es(es_docs):'
233
238
234
239
235 @celery.task(queue="logs", default_retry_delay=600, max_retries=144)
240 @celery.task(queue="logs", default_retry_delay=600, max_retries=144)
236 def add_logs(resource_id, request, dataset, environ=None, **kwargs):
241 def add_logs(resource_id, request_params, dataset, **kwargs):
237 proto_version = request.get('protocol_version')
242 proto_version = request_params.get('protocol_version')
238 current_time = datetime.utcnow().replace(second=0, microsecond=0)
243 current_time = datetime.utcnow().replace(second=0, microsecond=0)
239
244
240 try:
245 try:
@@ -308,13 +313,18 b' def add_logs(resource_id, request, dataset, environ=None, **kwargs):'
308 proto_version)
313 proto_version)
309 log.info(log_msg)
314 log.info(log_msg)
310 # mark_changed(session)
315 # mark_changed(session)
316 redis_pipeline = Datastores.redis.pipeline(transaction=False)
311 key = REDIS_KEYS['counters']['logs_per_minute'].format(current_time)
317 key = REDIS_KEYS['counters']['logs_per_minute'].format(current_time)
312 Datastores.redis.incr(key, total_logs)
318 redis_pipeline.incr(key, total_logs)
313 Datastores.redis.expire(key, 3600 * 24)
319 redis_pipeline.expire(key, 3600 * 24)
314 key = REDIS_KEYS['counters']['logs_per_minute_per_app'].format(
320 key = REDIS_KEYS['counters']['logs_per_hour_per_app'].format(
315 resource_id, current_time)
321 resource_id, current_time.replace(minute=0))
316 Datastores.redis.incr(key, total_logs)
322 redis_pipeline.incr(key, total_logs)
317 Datastores.redis.expire(key, 3600 * 24)
323 redis_pipeline.expire(key, 3600 * 24 * 7)
324 redis_pipeline.sadd(
325 REDIS_KEYS['apps_that_got_new_data_per_hour'],
326 resource_id, current_time.replace(minute=0))
327 redis_pipeline.execute()
318 add_logs_es(es_docs)
328 add_logs_es(es_docs)
319 return True
329 return True
320 except Exception as exc:
330 except Exception as exc:
@@ -329,7 +339,7 b' def add_logs_es(es_docs):'
329
339
330
340
331 @celery.task(queue="metrics", default_retry_delay=600, max_retries=144)
341 @celery.task(queue="metrics", default_retry_delay=600, max_retries=144)
332 def add_metrics(resource_id, request, dataset, proto_version):
342 def add_metrics(resource_id, request_params, dataset, proto_version):
333 current_time = datetime.utcnow().replace(second=0, microsecond=0)
343 current_time = datetime.utcnow().replace(second=0, microsecond=0)
334 try:
344 try:
335 application = ApplicationService.by_id_cached()(resource_id)
345 application = ApplicationService.by_id_cached()(resource_id)
@@ -361,13 +371,18 b' def add_metrics(resource_id, request, dataset, proto_version):'
361 log.info(metrics_msg)
371 log.info(metrics_msg)
362
372
363 mark_changed(session)
373 mark_changed(session)
374 redis_pipeline = Datastores.redis.pipeline(transaction=False)
364 key = REDIS_KEYS['counters']['metrics_per_minute'].format(current_time)
375 key = REDIS_KEYS['counters']['metrics_per_minute'].format(current_time)
365 Datastores.redis.incr(key, len(rows))
376 redis_pipeline.incr(key, len(rows))
366 Datastores.redis.expire(key, 3600 * 24)
377 redis_pipeline.expire(key, 3600 * 24)
367 key = REDIS_KEYS['counters']['metrics_per_minute_per_app'].format(
378 key = REDIS_KEYS['counters']['metrics_per_hour_per_app'].format(
368 resource_id, current_time)
379 resource_id, current_time.replace(minute=0))
369 Datastores.redis.incr(key, len(rows))
380 redis_pipeline.incr(key, len(rows))
370 Datastores.redis.expire(key, 3600 * 24)
381 redis_pipeline.expire(key, 3600 * 24 * 7)
382 redis_pipeline.sadd(
383 REDIS_KEYS['apps_that_got_new_data_per_hour'],
384 resource_id, current_time.replace(minute=0))
385 redis_pipeline.execute()
371 add_metrics_es(es_docs)
386 add_metrics_es(es_docs)
372 return True
387 return True
373 except Exception as exc:
388 except Exception as exc:
@@ -35,8 +35,11 b' def rate_limiting(request, resource, section, to_increment=1):'
35 tsample = datetime.datetime.utcnow().replace(second=0, microsecond=0)
35 tsample = datetime.datetime.utcnow().replace(second=0, microsecond=0)
36 key = REDIS_KEYS['rate_limits'][section].format(tsample,
36 key = REDIS_KEYS['rate_limits'][section].format(tsample,
37 resource.resource_id)
37 resource.resource_id)
38 current_count = Datastores.redis.incr(key, to_increment)
38 redis_pipeline = request.registry.redis_conn.pipeline()
39 Datastores.redis.expire(key, 3600 * 24)
39 redis_pipeline.incr(key, to_increment)
40 redis_pipeline.expire(key, 3600 * 24)
41 results = redis_pipeline.execute()
42 current_count = results[0]
40 config = ConfigService.by_key_and_section(section, 'global')
43 config = ConfigService.by_key_and_section(section, 'global')
41 limit = config.value if config else 1000
44 limit = config.value if config else 1000
42 if current_count > int(limit):
45 if current_count > int(limit):
@@ -28,15 +28,15 b' REDIS_KEYS = {'
28 },
28 },
29 'counters': {
29 'counters': {
30 'reports_per_minute': BASE.format('reports_per_minute:{}'),
30 'reports_per_minute': BASE.format('reports_per_minute:{}'),
31 'reports_per_minute_per_app': BASE.format(
31 'reports_per_hour_per_app': BASE.format(
32 'reports_per_minute_per_app:{}:{}'),
32 'reports_per_hour_per_app:{}:{}'),
33 'reports_per_type': BASE.format('reports_per_type:{}'),
33 'reports_per_type': BASE.format('reports_per_type:{}'),
34 'logs_per_minute': BASE.format('logs_per_minute:{}'),
34 'logs_per_minute': BASE.format('logs_per_minute:{}'),
35 'logs_per_minute_per_app': BASE.format(
35 'logs_per_hour_per_app': BASE.format(
36 'logs_per_minute_per_app:{}:{}'),
36 'logs_per_hour_per_app:{}:{}'),
37 'metrics_per_minute': BASE.format('metrics_per_minute:{}'),
37 'metrics_per_minute': BASE.format('metrics_per_minute:{}'),
38 'metrics_per_minute_per_app': BASE.format(
38 'metrics_per_hour_per_app': BASE.format(
39 'metrics_per_minute_per_app:{}:{}'),
39 'metrics_per_hour_per_app:{}:{}'),
40 'report_group_occurences': BASE.format('report_group_occurences:{}'),
40 'report_group_occurences': BASE.format('report_group_occurences:{}'),
41 'report_group_occurences_alerting': BASE.format(
41 'report_group_occurences_alerting': BASE.format(
42 'report_group_occurences_alerting:{}'),
42 'report_group_occurences_alerting:{}'),
@@ -53,6 +53,7 b' REDIS_KEYS = {'
53 'per_application_metrics_rate_limit': BASE.format(
53 'per_application_metrics_rate_limit': BASE.format(
54 'per_application_metrics_rate_limit:{}:{}'),
54 'per_application_metrics_rate_limit:{}:{}'),
55 },
55 },
56 'apps_that_got_new_data_per_hour': BASE.format('apps_that_got_new_data_per_hour'),
56 'apps_that_had_reports': BASE.format('apps_that_had_reports'),
57 'apps_that_had_reports': BASE.format('apps_that_had_reports'),
57 'apps_that_had_error_reports': BASE.format('apps_that_had_error_reports'),
58 'apps_that_had_error_reports': BASE.format('apps_that_had_error_reports'),
58 'apps_that_had_reports_alerting': BASE.format(
59 'apps_that_had_reports_alerting': BASE.format(
@@ -191,45 +191,46 b' class ReportGroup(Base, BaseModel):'
191 # global app counter
191 # global app counter
192 key = REDIS_KEYS['counters']['reports_per_type'].format(
192 key = REDIS_KEYS['counters']['reports_per_type'].format(
193 self.report_type, current_time)
193 self.report_type, current_time)
194 Datastores.redis.incr(key)
194 redis_pipeline = Datastores.redis.pipeline()
195 Datastores.redis.expire(key, 3600 * 24)
195 redis_pipeline.incr(key)
196 redis_pipeline.expire(key, 3600 * 24)
196 # detailed app notification for alerts and notifications
197 # detailed app notification for alerts and notifications
197 Datastores.redis.sadd(REDIS_KEYS['apps_that_had_reports'],
198 redis_pipeline.sadd(
198 self.resource_id)
199 REDIS_KEYS['apps_that_had_reports'], self.resource_id)
199 Datastores.redis.sadd(REDIS_KEYS['apps_that_had_reports_alerting'],
200 redis_pipeline.sadd(
200 self.resource_id)
201 REDIS_KEYS['apps_that_had_reports_alerting'], self.resource_id)
201 # only notify for exceptions here
202 # only notify for exceptions here
202 if self.report_type == ReportType.error:
203 if self.report_type == ReportType.error:
203 Datastores.redis.sadd(
204 redis_pipeline.sadd(
204 REDIS_KEYS['apps_that_had_reports'],
205 REDIS_KEYS['apps_that_had_reports'], self.resource_id)
205 self.resource_id)
206 redis_pipeline.sadd(
206 Datastores.redis.sadd(
207 REDIS_KEYS['apps_that_had_error_reports_alerting'],
207 REDIS_KEYS['apps_that_had_error_reports_alerting'],
208 self.resource_id)
208 self.resource_id)
209 key = REDIS_KEYS['counters']['report_group_occurences'].format(self.id)
209 key = REDIS_KEYS['counters']['report_group_occurences'].format(self.id)
210 Datastores.redis.incr(key)
210 redis_pipeline.incr(key)
211 Datastores.redis.expire(key, 3600 * 24)
211 redis_pipeline.expire(key, 3600 * 24)
212 key = REDIS_KEYS['counters']['report_group_occurences_alerting'].format(self.id)
212 key = REDIS_KEYS['counters']['report_group_occurences_alerting'].format(self.id)
213 Datastores.redis.incr(key)
213 redis_pipeline.incr(key)
214 Datastores.redis.expire(key, 3600 * 24)
214 redis_pipeline.expire(key, 3600 * 24)
215
215
216 if notify_10:
216 if notify_10:
217 key = REDIS_KEYS['counters'][
217 key = REDIS_KEYS['counters'][
218 'report_group_occurences_10th'].format(self.id)
218 'report_group_occurences_10th'].format(self.id)
219 Datastores.redis.setex(key, 3600 * 24, 1)
219 redis_pipeline.setex(key, 3600 * 24, 1)
220 if notify_100:
220 if notify_100:
221 key = REDIS_KEYS['counters'][
221 key = REDIS_KEYS['counters'][
222 'report_group_occurences_100th'].format(self.id)
222 'report_group_occurences_100th'].format(self.id)
223 Datastores.redis.setex(key, 3600 * 24, 1)
223 redis_pipeline.setex(key, 3600 * 24, 1)
224
224
225 key = REDIS_KEYS['reports_to_notify_per_type_per_app'].format(
225 key = REDIS_KEYS['reports_to_notify_per_type_per_app'].format(
226 self.report_type, self.resource_id)
226 self.report_type, self.resource_id)
227 Datastores.redis.sadd(key, self.id)
227 redis_pipeline.sadd(key, self.id)
228 Datastores.redis.expire(key, 3600 * 24)
228 redis_pipeline.expire(key, 3600 * 24)
229 key = REDIS_KEYS['reports_to_notify_per_type_per_app_alerting'].format(
229 key = REDIS_KEYS['reports_to_notify_per_type_per_app_alerting'].format(
230 self.report_type, self.resource_id)
230 self.report_type, self.resource_id)
231 Datastores.redis.sadd(key, self.id)
231 redis_pipeline.sadd(key, self.id)
232 Datastores.redis.expire(key, 3600 * 24)
232 redis_pipeline.expire(key, 3600 * 24)
233 redis_pipeline.execute()
233
234
234 @property
235 @property
235 def partition_id(self):
236 def partition_id(self):
@@ -56,14 +56,13 b' def system(request):'
56 current_time = datetime.utcnow(). \
56 current_time = datetime.utcnow(). \
57 replace(second=0, microsecond=0) - timedelta(minutes=1)
57 replace(second=0, microsecond=0) - timedelta(minutes=1)
58 # global app counter
58 # global app counter
59
59 processed_reports = request.registry.redis_conn.get(
60 processed_reports = Datastores.redis.get(
61 REDIS_KEYS['counters']['reports_per_minute'].format(current_time))
60 REDIS_KEYS['counters']['reports_per_minute'].format(current_time))
62 processed_reports = int(processed_reports) if processed_reports else 0
61 processed_reports = int(processed_reports) if processed_reports else 0
63 processed_logs = Datastores.redis.get(
62 processed_logs = request.registry.redis_conn.get(
64 REDIS_KEYS['counters']['logs_per_minute'].format(current_time))
63 REDIS_KEYS['counters']['logs_per_minute'].format(current_time))
65 processed_logs = int(processed_logs) if processed_logs else 0
64 processed_logs = int(processed_logs) if processed_logs else 0
66 processed_metrics = Datastores.redis.get(
65 processed_metrics = request.registry.redis_conn.get(
67 REDIS_KEYS['counters']['metrics_per_minute'].format(current_time))
66 REDIS_KEYS['counters']['metrics_per_minute'].format(current_time))
68 processed_metrics = int(processed_metrics) if processed_metrics else 0
67 processed_metrics = int(processed_metrics) if processed_metrics else 0
69
68
General Comments 0
You need to be logged in to leave comments. Login now