##// END OF EJS Templates
redis: some cleanups and use of pipelines for better performance
ergo -
Show More
@@ -91,8 +91,8 b' def test_retry_exception_task():'
91 91
92 92
93 93 @celery.task(queue="reports", default_retry_delay=600, max_retries=144)
94 def add_reports(resource_id, params, dataset, environ=None, **kwargs):
95 proto_version = parse_proto(params.get('protocol_version', ''))
94 def add_reports(resource_id, request_params, dataset, **kwargs):
95 proto_version = parse_proto(request_params.get('protocol_version', ''))
96 96 current_time = datetime.utcnow().replace(second=0, microsecond=0)
97 97 try:
98 98 # we will store solr docs here for single insert
@@ -194,13 +194,18 b' def add_reports(resource_id, params, dataset, environ=None, **kwargs):'
194 194 proto_version)
195 195 log.info(log_msg)
196 196 total_reports = len(dataset)
197 redis_pipeline = Datastores.redis.pipeline(transaction=False)
197 198 key = REDIS_KEYS['counters']['reports_per_minute'].format(current_time)
198 Datastores.redis.incr(key, total_reports)
199 Datastores.redis.expire(key, 3600 * 24)
200 key = REDIS_KEYS['counters']['reports_per_minute_per_app'].format(
201 resource_id, current_time)
202 Datastores.redis.incr(key, total_reports)
203 Datastores.redis.expire(key, 3600 * 24)
199 redis_pipeline.incr(key, total_reports)
200 redis_pipeline.expire(key, 3600 * 24)
201 key = REDIS_KEYS['counters']['reports_per_hour_per_app'].format(
202 resource_id, current_time.replace(minute=0))
203 redis_pipeline.incr(key, total_reports)
204 redis_pipeline.expire(key, 3600 * 24 * 7)
205 redis_pipeline.sadd(
206 REDIS_KEYS['apps_that_got_new_data_per_hour'],
207 resource_id, current_time.replace(minute=0))
208 redis_pipeline.execute()
204 209
205 210 add_reports_es(es_report_group_docs, es_report_docs)
206 211 add_reports_slow_calls_es(es_slow_calls_docs)
@@ -233,8 +238,8 b' def add_reports_stats_rows_es(es_docs):'
233 238
234 239
235 240 @celery.task(queue="logs", default_retry_delay=600, max_retries=144)
236 def add_logs(resource_id, request, dataset, environ=None, **kwargs):
237 proto_version = request.get('protocol_version')
241 def add_logs(resource_id, request_params, dataset, **kwargs):
242 proto_version = request_params.get('protocol_version')
238 243 current_time = datetime.utcnow().replace(second=0, microsecond=0)
239 244
240 245 try:
@@ -308,13 +313,18 b' def add_logs(resource_id, request, dataset, environ=None, **kwargs):'
308 313 proto_version)
309 314 log.info(log_msg)
310 315 # mark_changed(session)
316 redis_pipeline = Datastores.redis.pipeline(transaction=False)
311 317 key = REDIS_KEYS['counters']['logs_per_minute'].format(current_time)
312 Datastores.redis.incr(key, total_logs)
313 Datastores.redis.expire(key, 3600 * 24)
314 key = REDIS_KEYS['counters']['logs_per_minute_per_app'].format(
315 resource_id, current_time)
316 Datastores.redis.incr(key, total_logs)
317 Datastores.redis.expire(key, 3600 * 24)
318 redis_pipeline.incr(key, total_logs)
319 redis_pipeline.expire(key, 3600 * 24)
320 key = REDIS_KEYS['counters']['logs_per_hour_per_app'].format(
321 resource_id, current_time.replace(minute=0))
322 redis_pipeline.incr(key, total_logs)
323 redis_pipeline.expire(key, 3600 * 24 * 7)
324 redis_pipeline.sadd(
325 REDIS_KEYS['apps_that_got_new_data_per_hour'],
326 resource_id, current_time.replace(minute=0))
327 redis_pipeline.execute()
318 328 add_logs_es(es_docs)
319 329 return True
320 330 except Exception as exc:
@@ -329,7 +339,7 b' def add_logs_es(es_docs):'
329 339
330 340
331 341 @celery.task(queue="metrics", default_retry_delay=600, max_retries=144)
332 def add_metrics(resource_id, request, dataset, proto_version):
342 def add_metrics(resource_id, request_params, dataset, proto_version):
333 343 current_time = datetime.utcnow().replace(second=0, microsecond=0)
334 344 try:
335 345 application = ApplicationService.by_id_cached()(resource_id)
@@ -361,13 +371,18 b' def add_metrics(resource_id, request, dataset, proto_version):'
361 371 log.info(metrics_msg)
362 372
363 373 mark_changed(session)
374 redis_pipeline = Datastores.redis.pipeline(transaction=False)
364 375 key = REDIS_KEYS['counters']['metrics_per_minute'].format(current_time)
365 Datastores.redis.incr(key, len(rows))
366 Datastores.redis.expire(key, 3600 * 24)
367 key = REDIS_KEYS['counters']['metrics_per_minute_per_app'].format(
368 resource_id, current_time)
369 Datastores.redis.incr(key, len(rows))
370 Datastores.redis.expire(key, 3600 * 24)
376 redis_pipeline.incr(key, len(rows))
377 redis_pipeline.expire(key, 3600 * 24)
378 key = REDIS_KEYS['counters']['metrics_per_hour_per_app'].format(
379 resource_id, current_time.replace(minute=0))
380 redis_pipeline.incr(key, len(rows))
381 redis_pipeline.expire(key, 3600 * 24 * 7)
382 redis_pipeline.sadd(
383 REDIS_KEYS['apps_that_got_new_data_per_hour'],
384 resource_id, current_time.replace(minute=0))
385 redis_pipeline.execute()
371 386 add_metrics_es(es_docs)
372 387 return True
373 388 except Exception as exc:
@@ -35,8 +35,11 b' def rate_limiting(request, resource, section, to_increment=1):'
35 35 tsample = datetime.datetime.utcnow().replace(second=0, microsecond=0)
36 36 key = REDIS_KEYS['rate_limits'][section].format(tsample,
37 37 resource.resource_id)
38 current_count = Datastores.redis.incr(key, to_increment)
39 Datastores.redis.expire(key, 3600 * 24)
38 redis_pipeline = request.registry.redis_conn.pipeline()
39 redis_pipeline.incr(key, to_increment)
40 redis_pipeline.expire(key, 3600 * 24)
41 results = redis_pipeline.execute()
42 current_count = results[0]
40 43 config = ConfigService.by_key_and_section(section, 'global')
41 44 limit = config.value if config else 1000
42 45 if current_count > int(limit):
@@ -28,15 +28,15 b' REDIS_KEYS = {'
28 28 },
29 29 'counters': {
30 30 'reports_per_minute': BASE.format('reports_per_minute:{}'),
31 'reports_per_minute_per_app': BASE.format(
32 'reports_per_minute_per_app:{}:{}'),
31 'reports_per_hour_per_app': BASE.format(
32 'reports_per_hour_per_app:{}:{}'),
33 33 'reports_per_type': BASE.format('reports_per_type:{}'),
34 34 'logs_per_minute': BASE.format('logs_per_minute:{}'),
35 'logs_per_minute_per_app': BASE.format(
36 'logs_per_minute_per_app:{}:{}'),
35 'logs_per_hour_per_app': BASE.format(
36 'logs_per_hour_per_app:{}:{}'),
37 37 'metrics_per_minute': BASE.format('metrics_per_minute:{}'),
38 'metrics_per_minute_per_app': BASE.format(
39 'metrics_per_minute_per_app:{}:{}'),
38 'metrics_per_hour_per_app': BASE.format(
39 'metrics_per_hour_per_app:{}:{}'),
40 40 'report_group_occurences': BASE.format('report_group_occurences:{}'),
41 41 'report_group_occurences_alerting': BASE.format(
42 42 'report_group_occurences_alerting:{}'),
@@ -53,6 +53,7 b' REDIS_KEYS = {'
53 53 'per_application_metrics_rate_limit': BASE.format(
54 54 'per_application_metrics_rate_limit:{}:{}'),
55 55 },
56 'apps_that_got_new_data_per_hour': BASE.format('apps_that_got_new_data_per_hour'),
56 57 'apps_that_had_reports': BASE.format('apps_that_had_reports'),
57 58 'apps_that_had_error_reports': BASE.format('apps_that_had_error_reports'),
58 59 'apps_that_had_reports_alerting': BASE.format(
@@ -191,45 +191,46 b' class ReportGroup(Base, BaseModel):'
191 191 # global app counter
192 192 key = REDIS_KEYS['counters']['reports_per_type'].format(
193 193 self.report_type, current_time)
194 Datastores.redis.incr(key)
195 Datastores.redis.expire(key, 3600 * 24)
194 redis_pipeline = Datastores.redis.pipeline()
195 redis_pipeline.incr(key)
196 redis_pipeline.expire(key, 3600 * 24)
196 197 # detailed app notification for alerts and notifications
197 Datastores.redis.sadd(REDIS_KEYS['apps_that_had_reports'],
198 self.resource_id)
199 Datastores.redis.sadd(REDIS_KEYS['apps_that_had_reports_alerting'],
200 self.resource_id)
198 redis_pipeline.sadd(
199 REDIS_KEYS['apps_that_had_reports'], self.resource_id)
200 redis_pipeline.sadd(
201 REDIS_KEYS['apps_that_had_reports_alerting'], self.resource_id)
201 202 # only notify for exceptions here
202 203 if self.report_type == ReportType.error:
203 Datastores.redis.sadd(
204 REDIS_KEYS['apps_that_had_reports'],
205 self.resource_id)
206 Datastores.redis.sadd(
204 redis_pipeline.sadd(
205 REDIS_KEYS['apps_that_had_reports'], self.resource_id)
206 redis_pipeline.sadd(
207 207 REDIS_KEYS['apps_that_had_error_reports_alerting'],
208 208 self.resource_id)
209 209 key = REDIS_KEYS['counters']['report_group_occurences'].format(self.id)
210 Datastores.redis.incr(key)
211 Datastores.redis.expire(key, 3600 * 24)
210 redis_pipeline.incr(key)
211 redis_pipeline.expire(key, 3600 * 24)
212 212 key = REDIS_KEYS['counters']['report_group_occurences_alerting'].format(self.id)
213 Datastores.redis.incr(key)
214 Datastores.redis.expire(key, 3600 * 24)
213 redis_pipeline.incr(key)
214 redis_pipeline.expire(key, 3600 * 24)
215 215
216 216 if notify_10:
217 217 key = REDIS_KEYS['counters'][
218 218 'report_group_occurences_10th'].format(self.id)
219 Datastores.redis.setex(key, 3600 * 24, 1)
219 redis_pipeline.setex(key, 3600 * 24, 1)
220 220 if notify_100:
221 221 key = REDIS_KEYS['counters'][
222 222 'report_group_occurences_100th'].format(self.id)
223 Datastores.redis.setex(key, 3600 * 24, 1)
223 redis_pipeline.setex(key, 3600 * 24, 1)
224 224
225 225 key = REDIS_KEYS['reports_to_notify_per_type_per_app'].format(
226 226 self.report_type, self.resource_id)
227 Datastores.redis.sadd(key, self.id)
228 Datastores.redis.expire(key, 3600 * 24)
227 redis_pipeline.sadd(key, self.id)
228 redis_pipeline.expire(key, 3600 * 24)
229 229 key = REDIS_KEYS['reports_to_notify_per_type_per_app_alerting'].format(
230 230 self.report_type, self.resource_id)
231 Datastores.redis.sadd(key, self.id)
232 Datastores.redis.expire(key, 3600 * 24)
231 redis_pipeline.sadd(key, self.id)
232 redis_pipeline.expire(key, 3600 * 24)
233 redis_pipeline.execute()
233 234
234 235 @property
235 236 def partition_id(self):
@@ -56,14 +56,13 b' def system(request):'
56 56 current_time = datetime.utcnow(). \
57 57 replace(second=0, microsecond=0) - timedelta(minutes=1)
58 58 # global app counter
59
60 processed_reports = Datastores.redis.get(
59 processed_reports = request.registry.redis_conn.get(
61 60 REDIS_KEYS['counters']['reports_per_minute'].format(current_time))
62 61 processed_reports = int(processed_reports) if processed_reports else 0
63 processed_logs = Datastores.redis.get(
62 processed_logs = request.registry.redis_conn.get(
64 63 REDIS_KEYS['counters']['logs_per_minute'].format(current_time))
65 64 processed_logs = int(processed_logs) if processed_logs else 0
66 processed_metrics = Datastores.redis.get(
65 processed_metrics = request.registry.redis_conn.get(
67 66 REDIS_KEYS['counters']['metrics_per_minute'].format(current_time))
68 67 processed_metrics = int(processed_metrics) if processed_metrics else 0
69 68
General Comments 0
You need to be logged in to leave comments. Login now