Show More
@@ -91,8 +91,8 b' def test_retry_exception_task():' | |||
|
91 | 91 | |
|
92 | 92 | |
|
93 | 93 | @celery.task(queue="reports", default_retry_delay=600, max_retries=144) |
|
94 |
def add_reports(resource_id, params, dataset |
|
|
95 | proto_version = parse_proto(params.get('protocol_version', '')) | |
|
94 | def add_reports(resource_id, request_params, dataset, **kwargs): | |
|
95 | proto_version = parse_proto(request_params.get('protocol_version', '')) | |
|
96 | 96 | current_time = datetime.utcnow().replace(second=0, microsecond=0) |
|
97 | 97 | try: |
|
98 | 98 | # we will store solr docs here for single insert |
@@ -194,13 +194,18 b' def add_reports(resource_id, params, dataset, environ=None, **kwargs):' | |||
|
194 | 194 | proto_version) |
|
195 | 195 | log.info(log_msg) |
|
196 | 196 | total_reports = len(dataset) |
|
197 | redis_pipeline = Datastores.redis.pipeline(transaction=False) | |
|
197 | 198 | key = REDIS_KEYS['counters']['reports_per_minute'].format(current_time) |
|
198 |
|
|
|
199 |
|
|
|
200 |
key = REDIS_KEYS['counters']['reports_per_ |
|
|
201 | resource_id, current_time) | |
|
202 |
|
|
|
203 |
|
|
|
199 | redis_pipeline.incr(key, total_reports) | |
|
200 | redis_pipeline.expire(key, 3600 * 24) | |
|
201 | key = REDIS_KEYS['counters']['reports_per_hour_per_app'].format( | |
|
202 | resource_id, current_time.replace(minute=0)) | |
|
203 | redis_pipeline.incr(key, total_reports) | |
|
204 | redis_pipeline.expire(key, 3600 * 24 * 7) | |
|
205 | redis_pipeline.sadd( | |
|
206 | REDIS_KEYS['apps_that_got_new_data_per_hour'], | |
|
207 | resource_id, current_time.replace(minute=0)) | |
|
208 | redis_pipeline.execute() | |
|
204 | 209 | |
|
205 | 210 | add_reports_es(es_report_group_docs, es_report_docs) |
|
206 | 211 | add_reports_slow_calls_es(es_slow_calls_docs) |
@@ -233,8 +238,8 b' def add_reports_stats_rows_es(es_docs):' | |||
|
233 | 238 | |
|
234 | 239 | |
|
235 | 240 | @celery.task(queue="logs", default_retry_delay=600, max_retries=144) |
|
236 |
def add_logs(resource_id, request, dataset |
|
|
237 | proto_version = request.get('protocol_version') | |
|
241 | def add_logs(resource_id, request_params, dataset, **kwargs): | |
|
242 | proto_version = request_params.get('protocol_version') | |
|
238 | 243 | current_time = datetime.utcnow().replace(second=0, microsecond=0) |
|
239 | 244 | |
|
240 | 245 | try: |
@@ -308,13 +313,18 b' def add_logs(resource_id, request, dataset, environ=None, **kwargs):' | |||
|
308 | 313 | proto_version) |
|
309 | 314 | log.info(log_msg) |
|
310 | 315 | # mark_changed(session) |
|
316 | redis_pipeline = Datastores.redis.pipeline(transaction=False) | |
|
311 | 317 | key = REDIS_KEYS['counters']['logs_per_minute'].format(current_time) |
|
312 |
|
|
|
313 |
|
|
|
314 |
key = REDIS_KEYS['counters']['logs_per_ |
|
|
315 | resource_id, current_time) | |
|
316 |
|
|
|
317 |
|
|
|
318 | redis_pipeline.incr(key, total_logs) | |
|
319 | redis_pipeline.expire(key, 3600 * 24) | |
|
320 | key = REDIS_KEYS['counters']['logs_per_hour_per_app'].format( | |
|
321 | resource_id, current_time.replace(minute=0)) | |
|
322 | redis_pipeline.incr(key, total_logs) | |
|
323 | redis_pipeline.expire(key, 3600 * 24 * 7) | |
|
324 | redis_pipeline.sadd( | |
|
325 | REDIS_KEYS['apps_that_got_new_data_per_hour'], | |
|
326 | resource_id, current_time.replace(minute=0)) | |
|
327 | redis_pipeline.execute() | |
|
318 | 328 | add_logs_es(es_docs) |
|
319 | 329 | return True |
|
320 | 330 | except Exception as exc: |
@@ -329,7 +339,7 b' def add_logs_es(es_docs):' | |||
|
329 | 339 | |
|
330 | 340 | |
|
331 | 341 | @celery.task(queue="metrics", default_retry_delay=600, max_retries=144) |
|
332 | def add_metrics(resource_id, request, dataset, proto_version): | |
|
342 | def add_metrics(resource_id, request_params, dataset, proto_version): | |
|
333 | 343 | current_time = datetime.utcnow().replace(second=0, microsecond=0) |
|
334 | 344 | try: |
|
335 | 345 | application = ApplicationService.by_id_cached()(resource_id) |
@@ -361,13 +371,18 b' def add_metrics(resource_id, request, dataset, proto_version):' | |||
|
361 | 371 | log.info(metrics_msg) |
|
362 | 372 | |
|
363 | 373 | mark_changed(session) |
|
374 | redis_pipeline = Datastores.redis.pipeline(transaction=False) | |
|
364 | 375 | key = REDIS_KEYS['counters']['metrics_per_minute'].format(current_time) |
|
365 |
|
|
|
366 |
|
|
|
367 |
key = REDIS_KEYS['counters']['metrics_per_ |
|
|
368 | resource_id, current_time) | |
|
369 |
|
|
|
370 |
|
|
|
376 | redis_pipeline.incr(key, len(rows)) | |
|
377 | redis_pipeline.expire(key, 3600 * 24) | |
|
378 | key = REDIS_KEYS['counters']['metrics_per_hour_per_app'].format( | |
|
379 | resource_id, current_time.replace(minute=0)) | |
|
380 | redis_pipeline.incr(key, len(rows)) | |
|
381 | redis_pipeline.expire(key, 3600 * 24 * 7) | |
|
382 | redis_pipeline.sadd( | |
|
383 | REDIS_KEYS['apps_that_got_new_data_per_hour'], | |
|
384 | resource_id, current_time.replace(minute=0)) | |
|
385 | redis_pipeline.execute() | |
|
371 | 386 | add_metrics_es(es_docs) |
|
372 | 387 | return True |
|
373 | 388 | except Exception as exc: |
@@ -35,8 +35,11 b' def rate_limiting(request, resource, section, to_increment=1):' | |||
|
35 | 35 | tsample = datetime.datetime.utcnow().replace(second=0, microsecond=0) |
|
36 | 36 | key = REDIS_KEYS['rate_limits'][section].format(tsample, |
|
37 | 37 | resource.resource_id) |
|
38 | current_count = Datastores.redis.incr(key, to_increment) | |
|
39 | Datastores.redis.expire(key, 3600 * 24) | |
|
38 | redis_pipeline = request.registry.redis_conn.pipeline() | |
|
39 | redis_pipeline.incr(key, to_increment) | |
|
40 | redis_pipeline.expire(key, 3600 * 24) | |
|
41 | results = redis_pipeline.execute() | |
|
42 | current_count = results[0] | |
|
40 | 43 | config = ConfigService.by_key_and_section(section, 'global') |
|
41 | 44 | limit = config.value if config else 1000 |
|
42 | 45 | if current_count > int(limit): |
@@ -28,15 +28,15 b' REDIS_KEYS = {' | |||
|
28 | 28 | }, |
|
29 | 29 | 'counters': { |
|
30 | 30 | 'reports_per_minute': BASE.format('reports_per_minute:{}'), |
|
31 |
'reports_per_ |
|
|
32 |
'reports_per_ |
|
|
31 | 'reports_per_hour_per_app': BASE.format( | |
|
32 | 'reports_per_hour_per_app:{}:{}'), | |
|
33 | 33 | 'reports_per_type': BASE.format('reports_per_type:{}'), |
|
34 | 34 | 'logs_per_minute': BASE.format('logs_per_minute:{}'), |
|
35 |
'logs_per_ |
|
|
36 |
'logs_per_ |
|
|
35 | 'logs_per_hour_per_app': BASE.format( | |
|
36 | 'logs_per_hour_per_app:{}:{}'), | |
|
37 | 37 | 'metrics_per_minute': BASE.format('metrics_per_minute:{}'), |
|
38 |
'metrics_per_ |
|
|
39 |
'metrics_per_ |
|
|
38 | 'metrics_per_hour_per_app': BASE.format( | |
|
39 | 'metrics_per_hour_per_app:{}:{}'), | |
|
40 | 40 | 'report_group_occurences': BASE.format('report_group_occurences:{}'), |
|
41 | 41 | 'report_group_occurences_alerting': BASE.format( |
|
42 | 42 | 'report_group_occurences_alerting:{}'), |
@@ -53,6 +53,7 b' REDIS_KEYS = {' | |||
|
53 | 53 | 'per_application_metrics_rate_limit': BASE.format( |
|
54 | 54 | 'per_application_metrics_rate_limit:{}:{}'), |
|
55 | 55 | }, |
|
56 | 'apps_that_got_new_data_per_hour': BASE.format('apps_that_got_new_data_per_hour'), | |
|
56 | 57 | 'apps_that_had_reports': BASE.format('apps_that_had_reports'), |
|
57 | 58 | 'apps_that_had_error_reports': BASE.format('apps_that_had_error_reports'), |
|
58 | 59 | 'apps_that_had_reports_alerting': BASE.format( |
@@ -191,45 +191,46 b' class ReportGroup(Base, BaseModel):' | |||
|
191 | 191 | # global app counter |
|
192 | 192 | key = REDIS_KEYS['counters']['reports_per_type'].format( |
|
193 | 193 | self.report_type, current_time) |
|
194 |
Datastores.redis. |
|
|
195 | Datastores.redis.expire(key, 3600 * 24) | |
|
194 | redis_pipeline = Datastores.redis.pipeline() | |
|
195 | redis_pipeline.incr(key) | |
|
196 | redis_pipeline.expire(key, 3600 * 24) | |
|
196 | 197 | # detailed app notification for alerts and notifications |
|
197 | Datastores.redis.sadd(REDIS_KEYS['apps_that_had_reports'], | |
|
198 | self.resource_id) | |
|
199 | Datastores.redis.sadd(REDIS_KEYS['apps_that_had_reports_alerting'], | |
|
200 | self.resource_id) | |
|
198 | redis_pipeline.sadd( | |
|
199 | REDIS_KEYS['apps_that_had_reports'], self.resource_id) | |
|
200 | redis_pipeline.sadd( | |
|
201 | REDIS_KEYS['apps_that_had_reports_alerting'], self.resource_id) | |
|
201 | 202 | # only notify for exceptions here |
|
202 | 203 | if self.report_type == ReportType.error: |
|
203 |
|
|
|
204 | REDIS_KEYS['apps_that_had_reports'], | |
|
205 | self.resource_id) | |
|
206 | Datastores.redis.sadd( | |
|
204 | redis_pipeline.sadd( | |
|
205 | REDIS_KEYS['apps_that_had_reports'], self.resource_id) | |
|
206 | redis_pipeline.sadd( | |
|
207 | 207 | REDIS_KEYS['apps_that_had_error_reports_alerting'], |
|
208 | 208 | self.resource_id) |
|
209 | 209 | key = REDIS_KEYS['counters']['report_group_occurences'].format(self.id) |
|
210 |
|
|
|
211 |
|
|
|
210 | redis_pipeline.incr(key) | |
|
211 | redis_pipeline.expire(key, 3600 * 24) | |
|
212 | 212 | key = REDIS_KEYS['counters']['report_group_occurences_alerting'].format(self.id) |
|
213 |
|
|
|
214 |
|
|
|
213 | redis_pipeline.incr(key) | |
|
214 | redis_pipeline.expire(key, 3600 * 24) | |
|
215 | 215 | |
|
216 | 216 | if notify_10: |
|
217 | 217 | key = REDIS_KEYS['counters'][ |
|
218 | 218 | 'report_group_occurences_10th'].format(self.id) |
|
219 |
|
|
|
219 | redis_pipeline.setex(key, 3600 * 24, 1) | |
|
220 | 220 | if notify_100: |
|
221 | 221 | key = REDIS_KEYS['counters'][ |
|
222 | 222 | 'report_group_occurences_100th'].format(self.id) |
|
223 |
|
|
|
223 | redis_pipeline.setex(key, 3600 * 24, 1) | |
|
224 | 224 | |
|
225 | 225 | key = REDIS_KEYS['reports_to_notify_per_type_per_app'].format( |
|
226 | 226 | self.report_type, self.resource_id) |
|
227 |
|
|
|
228 |
|
|
|
227 | redis_pipeline.sadd(key, self.id) | |
|
228 | redis_pipeline.expire(key, 3600 * 24) | |
|
229 | 229 | key = REDIS_KEYS['reports_to_notify_per_type_per_app_alerting'].format( |
|
230 | 230 | self.report_type, self.resource_id) |
|
231 |
|
|
|
232 |
|
|
|
231 | redis_pipeline.sadd(key, self.id) | |
|
232 | redis_pipeline.expire(key, 3600 * 24) | |
|
233 | redis_pipeline.execute() | |
|
233 | 234 | |
|
234 | 235 | @property |
|
235 | 236 | def partition_id(self): |
@@ -56,14 +56,13 b' def system(request):' | |||
|
56 | 56 | current_time = datetime.utcnow(). \ |
|
57 | 57 | replace(second=0, microsecond=0) - timedelta(minutes=1) |
|
58 | 58 | # global app counter |
|
59 | ||
|
60 | processed_reports = Datastores.redis.get( | |
|
59 | processed_reports = request.registry.redis_conn.get( | |
|
61 | 60 | REDIS_KEYS['counters']['reports_per_minute'].format(current_time)) |
|
62 | 61 | processed_reports = int(processed_reports) if processed_reports else 0 |
|
63 |
processed_logs = |
|
|
62 | processed_logs = request.registry.redis_conn.get( | |
|
64 | 63 | REDIS_KEYS['counters']['logs_per_minute'].format(current_time)) |
|
65 | 64 | processed_logs = int(processed_logs) if processed_logs else 0 |
|
66 |
processed_metrics = |
|
|
65 | processed_metrics = request.registry.redis_conn.get( | |
|
67 | 66 | REDIS_KEYS['counters']['metrics_per_minute'].format(current_time)) |
|
68 | 67 | processed_metrics = int(processed_metrics) if processed_metrics else 0 |
|
69 | 68 |
General Comments 0
You need to be logged in to leave comments.
Login now