Show More
@@ -91,8 +91,8 b' def test_retry_exception_task():' | |||||
91 |
|
91 | |||
92 |
|
92 | |||
93 | @celery.task(queue="reports", default_retry_delay=600, max_retries=144) |
|
93 | @celery.task(queue="reports", default_retry_delay=600, max_retries=144) | |
94 |
def add_reports(resource_id, params, dataset |
|
94 | def add_reports(resource_id, request_params, dataset, **kwargs): | |
95 | proto_version = parse_proto(params.get('protocol_version', '')) |
|
95 | proto_version = parse_proto(request_params.get('protocol_version', '')) | |
96 | current_time = datetime.utcnow().replace(second=0, microsecond=0) |
|
96 | current_time = datetime.utcnow().replace(second=0, microsecond=0) | |
97 | try: |
|
97 | try: | |
98 | # we will store solr docs here for single insert |
|
98 | # we will store solr docs here for single insert | |
@@ -194,13 +194,18 b' def add_reports(resource_id, params, dataset, environ=None, **kwargs):' | |||||
194 | proto_version) |
|
194 | proto_version) | |
195 | log.info(log_msg) |
|
195 | log.info(log_msg) | |
196 | total_reports = len(dataset) |
|
196 | total_reports = len(dataset) | |
|
197 | redis_pipeline = Datastores.redis.pipeline(transaction=False) | |||
197 | key = REDIS_KEYS['counters']['reports_per_minute'].format(current_time) |
|
198 | key = REDIS_KEYS['counters']['reports_per_minute'].format(current_time) | |
198 |
|
|
199 | redis_pipeline.incr(key, total_reports) | |
199 |
|
|
200 | redis_pipeline.expire(key, 3600 * 24) | |
200 |
key = REDIS_KEYS['counters']['reports_per_ |
|
201 | key = REDIS_KEYS['counters']['reports_per_hour_per_app'].format( | |
201 | resource_id, current_time) |
|
202 | resource_id, current_time.replace(minute=0)) | |
202 |
|
|
203 | redis_pipeline.incr(key, total_reports) | |
203 |
|
|
204 | redis_pipeline.expire(key, 3600 * 24 * 7) | |
|
205 | redis_pipeline.sadd( | |||
|
206 | REDIS_KEYS['apps_that_got_new_data_per_hour'], | |||
|
207 | resource_id, current_time.replace(minute=0)) | |||
|
208 | redis_pipeline.execute() | |||
204 |
|
209 | |||
205 | add_reports_es(es_report_group_docs, es_report_docs) |
|
210 | add_reports_es(es_report_group_docs, es_report_docs) | |
206 | add_reports_slow_calls_es(es_slow_calls_docs) |
|
211 | add_reports_slow_calls_es(es_slow_calls_docs) | |
@@ -233,8 +238,8 b' def add_reports_stats_rows_es(es_docs):' | |||||
233 |
|
238 | |||
234 |
|
239 | |||
235 | @celery.task(queue="logs", default_retry_delay=600, max_retries=144) |
|
240 | @celery.task(queue="logs", default_retry_delay=600, max_retries=144) | |
236 |
def add_logs(resource_id, request, dataset |
|
241 | def add_logs(resource_id, request_params, dataset, **kwargs): | |
237 | proto_version = request.get('protocol_version') |
|
242 | proto_version = request_params.get('protocol_version') | |
238 | current_time = datetime.utcnow().replace(second=0, microsecond=0) |
|
243 | current_time = datetime.utcnow().replace(second=0, microsecond=0) | |
239 |
|
244 | |||
240 | try: |
|
245 | try: | |
@@ -308,13 +313,18 b' def add_logs(resource_id, request, dataset, environ=None, **kwargs):' | |||||
308 | proto_version) |
|
313 | proto_version) | |
309 | log.info(log_msg) |
|
314 | log.info(log_msg) | |
310 | # mark_changed(session) |
|
315 | # mark_changed(session) | |
|
316 | redis_pipeline = Datastores.redis.pipeline(transaction=False) | |||
311 | key = REDIS_KEYS['counters']['logs_per_minute'].format(current_time) |
|
317 | key = REDIS_KEYS['counters']['logs_per_minute'].format(current_time) | |
312 |
|
|
318 | redis_pipeline.incr(key, total_logs) | |
313 |
|
|
319 | redis_pipeline.expire(key, 3600 * 24) | |
314 |
key = REDIS_KEYS['counters']['logs_per_ |
|
320 | key = REDIS_KEYS['counters']['logs_per_hour_per_app'].format( | |
315 | resource_id, current_time) |
|
321 | resource_id, current_time.replace(minute=0)) | |
316 |
|
|
322 | redis_pipeline.incr(key, total_logs) | |
317 |
|
|
323 | redis_pipeline.expire(key, 3600 * 24 * 7) | |
|
324 | redis_pipeline.sadd( | |||
|
325 | REDIS_KEYS['apps_that_got_new_data_per_hour'], | |||
|
326 | resource_id, current_time.replace(minute=0)) | |||
|
327 | redis_pipeline.execute() | |||
318 | add_logs_es(es_docs) |
|
328 | add_logs_es(es_docs) | |
319 | return True |
|
329 | return True | |
320 | except Exception as exc: |
|
330 | except Exception as exc: | |
@@ -329,7 +339,7 b' def add_logs_es(es_docs):' | |||||
329 |
|
339 | |||
330 |
|
340 | |||
331 | @celery.task(queue="metrics", default_retry_delay=600, max_retries=144) |
|
341 | @celery.task(queue="metrics", default_retry_delay=600, max_retries=144) | |
332 | def add_metrics(resource_id, request, dataset, proto_version): |
|
342 | def add_metrics(resource_id, request_params, dataset, proto_version): | |
333 | current_time = datetime.utcnow().replace(second=0, microsecond=0) |
|
343 | current_time = datetime.utcnow().replace(second=0, microsecond=0) | |
334 | try: |
|
344 | try: | |
335 | application = ApplicationService.by_id_cached()(resource_id) |
|
345 | application = ApplicationService.by_id_cached()(resource_id) | |
@@ -361,13 +371,18 b' def add_metrics(resource_id, request, dataset, proto_version):' | |||||
361 | log.info(metrics_msg) |
|
371 | log.info(metrics_msg) | |
362 |
|
372 | |||
363 | mark_changed(session) |
|
373 | mark_changed(session) | |
|
374 | redis_pipeline = Datastores.redis.pipeline(transaction=False) | |||
364 | key = REDIS_KEYS['counters']['metrics_per_minute'].format(current_time) |
|
375 | key = REDIS_KEYS['counters']['metrics_per_minute'].format(current_time) | |
365 |
|
|
376 | redis_pipeline.incr(key, len(rows)) | |
366 |
|
|
377 | redis_pipeline.expire(key, 3600 * 24) | |
367 |
key = REDIS_KEYS['counters']['metrics_per_ |
|
378 | key = REDIS_KEYS['counters']['metrics_per_hour_per_app'].format( | |
368 | resource_id, current_time) |
|
379 | resource_id, current_time.replace(minute=0)) | |
369 |
|
|
380 | redis_pipeline.incr(key, len(rows)) | |
370 |
|
|
381 | redis_pipeline.expire(key, 3600 * 24 * 7) | |
|
382 | redis_pipeline.sadd( | |||
|
383 | REDIS_KEYS['apps_that_got_new_data_per_hour'], | |||
|
384 | resource_id, current_time.replace(minute=0)) | |||
|
385 | redis_pipeline.execute() | |||
371 | add_metrics_es(es_docs) |
|
386 | add_metrics_es(es_docs) | |
372 | return True |
|
387 | return True | |
373 | except Exception as exc: |
|
388 | except Exception as exc: |
@@ -35,8 +35,11 b' def rate_limiting(request, resource, section, to_increment=1):' | |||||
35 | tsample = datetime.datetime.utcnow().replace(second=0, microsecond=0) |
|
35 | tsample = datetime.datetime.utcnow().replace(second=0, microsecond=0) | |
36 | key = REDIS_KEYS['rate_limits'][section].format(tsample, |
|
36 | key = REDIS_KEYS['rate_limits'][section].format(tsample, | |
37 | resource.resource_id) |
|
37 | resource.resource_id) | |
38 | current_count = Datastores.redis.incr(key, to_increment) |
|
38 | redis_pipeline = request.registry.redis_conn.pipeline() | |
39 | Datastores.redis.expire(key, 3600 * 24) |
|
39 | redis_pipeline.incr(key, to_increment) | |
|
40 | redis_pipeline.expire(key, 3600 * 24) | |||
|
41 | results = redis_pipeline.execute() | |||
|
42 | current_count = results[0] | |||
40 | config = ConfigService.by_key_and_section(section, 'global') |
|
43 | config = ConfigService.by_key_and_section(section, 'global') | |
41 | limit = config.value if config else 1000 |
|
44 | limit = config.value if config else 1000 | |
42 | if current_count > int(limit): |
|
45 | if current_count > int(limit): |
@@ -28,15 +28,15 b' REDIS_KEYS = {' | |||||
28 | }, |
|
28 | }, | |
29 | 'counters': { |
|
29 | 'counters': { | |
30 | 'reports_per_minute': BASE.format('reports_per_minute:{}'), |
|
30 | 'reports_per_minute': BASE.format('reports_per_minute:{}'), | |
31 |
'reports_per_ |
|
31 | 'reports_per_hour_per_app': BASE.format( | |
32 |
'reports_per_ |
|
32 | 'reports_per_hour_per_app:{}:{}'), | |
33 | 'reports_per_type': BASE.format('reports_per_type:{}'), |
|
33 | 'reports_per_type': BASE.format('reports_per_type:{}'), | |
34 | 'logs_per_minute': BASE.format('logs_per_minute:{}'), |
|
34 | 'logs_per_minute': BASE.format('logs_per_minute:{}'), | |
35 |
'logs_per_ |
|
35 | 'logs_per_hour_per_app': BASE.format( | |
36 |
'logs_per_ |
|
36 | 'logs_per_hour_per_app:{}:{}'), | |
37 | 'metrics_per_minute': BASE.format('metrics_per_minute:{}'), |
|
37 | 'metrics_per_minute': BASE.format('metrics_per_minute:{}'), | |
38 |
'metrics_per_ |
|
38 | 'metrics_per_hour_per_app': BASE.format( | |
39 |
'metrics_per_ |
|
39 | 'metrics_per_hour_per_app:{}:{}'), | |
40 | 'report_group_occurences': BASE.format('report_group_occurences:{}'), |
|
40 | 'report_group_occurences': BASE.format('report_group_occurences:{}'), | |
41 | 'report_group_occurences_alerting': BASE.format( |
|
41 | 'report_group_occurences_alerting': BASE.format( | |
42 | 'report_group_occurences_alerting:{}'), |
|
42 | 'report_group_occurences_alerting:{}'), | |
@@ -53,6 +53,7 b' REDIS_KEYS = {' | |||||
53 | 'per_application_metrics_rate_limit': BASE.format( |
|
53 | 'per_application_metrics_rate_limit': BASE.format( | |
54 | 'per_application_metrics_rate_limit:{}:{}'), |
|
54 | 'per_application_metrics_rate_limit:{}:{}'), | |
55 | }, |
|
55 | }, | |
|
56 | 'apps_that_got_new_data_per_hour': BASE.format('apps_that_got_new_data_per_hour'), | |||
56 | 'apps_that_had_reports': BASE.format('apps_that_had_reports'), |
|
57 | 'apps_that_had_reports': BASE.format('apps_that_had_reports'), | |
57 | 'apps_that_had_error_reports': BASE.format('apps_that_had_error_reports'), |
|
58 | 'apps_that_had_error_reports': BASE.format('apps_that_had_error_reports'), | |
58 | 'apps_that_had_reports_alerting': BASE.format( |
|
59 | 'apps_that_had_reports_alerting': BASE.format( |
@@ -191,45 +191,46 b' class ReportGroup(Base, BaseModel):' | |||||
191 | # global app counter |
|
191 | # global app counter | |
192 | key = REDIS_KEYS['counters']['reports_per_type'].format( |
|
192 | key = REDIS_KEYS['counters']['reports_per_type'].format( | |
193 | self.report_type, current_time) |
|
193 | self.report_type, current_time) | |
194 |
Datastores.redis. |
|
194 | redis_pipeline = Datastores.redis.pipeline() | |
195 | Datastores.redis.expire(key, 3600 * 24) |
|
195 | redis_pipeline.incr(key) | |
|
196 | redis_pipeline.expire(key, 3600 * 24) | |||
196 | # detailed app notification for alerts and notifications |
|
197 | # detailed app notification for alerts and notifications | |
197 | Datastores.redis.sadd(REDIS_KEYS['apps_that_had_reports'], |
|
198 | redis_pipeline.sadd( | |
198 | self.resource_id) |
|
199 | REDIS_KEYS['apps_that_had_reports'], self.resource_id) | |
199 | Datastores.redis.sadd(REDIS_KEYS['apps_that_had_reports_alerting'], |
|
200 | redis_pipeline.sadd( | |
200 | self.resource_id) |
|
201 | REDIS_KEYS['apps_that_had_reports_alerting'], self.resource_id) | |
201 | # only notify for exceptions here |
|
202 | # only notify for exceptions here | |
202 | if self.report_type == ReportType.error: |
|
203 | if self.report_type == ReportType.error: | |
203 |
|
|
204 | redis_pipeline.sadd( | |
204 | REDIS_KEYS['apps_that_had_reports'], |
|
205 | REDIS_KEYS['apps_that_had_reports'], self.resource_id) | |
205 | self.resource_id) |
|
206 | redis_pipeline.sadd( | |
206 | Datastores.redis.sadd( |
|
|||
207 | REDIS_KEYS['apps_that_had_error_reports_alerting'], |
|
207 | REDIS_KEYS['apps_that_had_error_reports_alerting'], | |
208 | self.resource_id) |
|
208 | self.resource_id) | |
209 | key = REDIS_KEYS['counters']['report_group_occurences'].format(self.id) |
|
209 | key = REDIS_KEYS['counters']['report_group_occurences'].format(self.id) | |
210 |
|
|
210 | redis_pipeline.incr(key) | |
211 |
|
|
211 | redis_pipeline.expire(key, 3600 * 24) | |
212 | key = REDIS_KEYS['counters']['report_group_occurences_alerting'].format(self.id) |
|
212 | key = REDIS_KEYS['counters']['report_group_occurences_alerting'].format(self.id) | |
213 |
|
|
213 | redis_pipeline.incr(key) | |
214 |
|
|
214 | redis_pipeline.expire(key, 3600 * 24) | |
215 |
|
215 | |||
216 | if notify_10: |
|
216 | if notify_10: | |
217 | key = REDIS_KEYS['counters'][ |
|
217 | key = REDIS_KEYS['counters'][ | |
218 | 'report_group_occurences_10th'].format(self.id) |
|
218 | 'report_group_occurences_10th'].format(self.id) | |
219 |
|
|
219 | redis_pipeline.setex(key, 3600 * 24, 1) | |
220 | if notify_100: |
|
220 | if notify_100: | |
221 | key = REDIS_KEYS['counters'][ |
|
221 | key = REDIS_KEYS['counters'][ | |
222 | 'report_group_occurences_100th'].format(self.id) |
|
222 | 'report_group_occurences_100th'].format(self.id) | |
223 |
|
|
223 | redis_pipeline.setex(key, 3600 * 24, 1) | |
224 |
|
224 | |||
225 | key = REDIS_KEYS['reports_to_notify_per_type_per_app'].format( |
|
225 | key = REDIS_KEYS['reports_to_notify_per_type_per_app'].format( | |
226 | self.report_type, self.resource_id) |
|
226 | self.report_type, self.resource_id) | |
227 |
|
|
227 | redis_pipeline.sadd(key, self.id) | |
228 |
|
|
228 | redis_pipeline.expire(key, 3600 * 24) | |
229 | key = REDIS_KEYS['reports_to_notify_per_type_per_app_alerting'].format( |
|
229 | key = REDIS_KEYS['reports_to_notify_per_type_per_app_alerting'].format( | |
230 | self.report_type, self.resource_id) |
|
230 | self.report_type, self.resource_id) | |
231 |
|
|
231 | redis_pipeline.sadd(key, self.id) | |
232 |
|
|
232 | redis_pipeline.expire(key, 3600 * 24) | |
|
233 | redis_pipeline.execute() | |||
233 |
|
234 | |||
234 | @property |
|
235 | @property | |
235 | def partition_id(self): |
|
236 | def partition_id(self): |
@@ -56,14 +56,13 b' def system(request):' | |||||
56 | current_time = datetime.utcnow(). \ |
|
56 | current_time = datetime.utcnow(). \ | |
57 | replace(second=0, microsecond=0) - timedelta(minutes=1) |
|
57 | replace(second=0, microsecond=0) - timedelta(minutes=1) | |
58 | # global app counter |
|
58 | # global app counter | |
59 |
|
59 | processed_reports = request.registry.redis_conn.get( | ||
60 | processed_reports = Datastores.redis.get( |
|
|||
61 | REDIS_KEYS['counters']['reports_per_minute'].format(current_time)) |
|
60 | REDIS_KEYS['counters']['reports_per_minute'].format(current_time)) | |
62 | processed_reports = int(processed_reports) if processed_reports else 0 |
|
61 | processed_reports = int(processed_reports) if processed_reports else 0 | |
63 |
processed_logs = |
|
62 | processed_logs = request.registry.redis_conn.get( | |
64 | REDIS_KEYS['counters']['logs_per_minute'].format(current_time)) |
|
63 | REDIS_KEYS['counters']['logs_per_minute'].format(current_time)) | |
65 | processed_logs = int(processed_logs) if processed_logs else 0 |
|
64 | processed_logs = int(processed_logs) if processed_logs else 0 | |
66 |
processed_metrics = |
|
65 | processed_metrics = request.registry.redis_conn.get( | |
67 | REDIS_KEYS['counters']['metrics_per_minute'].format(current_time)) |
|
66 | REDIS_KEYS['counters']['metrics_per_minute'].format(current_time)) | |
68 | processed_metrics = int(processed_metrics) if processed_metrics else 0 |
|
67 | processed_metrics = int(processed_metrics) if processed_metrics else 0 | |
69 |
|
68 |
General Comments 0
You need to be logged in to leave comments.
Login now