Show More
@@ -198,6 +198,10 b' def add_reports(resource_id, request_params, dataset, **kwargs):' | |||||
198 | key = REDIS_KEYS['counters']['reports_per_minute'].format(current_time) |
|
198 | key = REDIS_KEYS['counters']['reports_per_minute'].format(current_time) | |
199 | redis_pipeline.incr(key, total_reports) |
|
199 | redis_pipeline.incr(key, total_reports) | |
200 | redis_pipeline.expire(key, 3600 * 24) |
|
200 | redis_pipeline.expire(key, 3600 * 24) | |
|
201 | key = REDIS_KEYS['counters']['events_per_minute_per_user'].format( | |||
|
202 | resource.owner_user_id, current_time) | |||
|
203 | redis_pipeline.incr(key, total_reports) | |||
|
204 | redis_pipeline.expire(key, 3600) | |||
201 | key = REDIS_KEYS['counters']['reports_per_hour_per_app'].format( |
|
205 | key = REDIS_KEYS['counters']['reports_per_hour_per_app'].format( | |
202 | resource_id, current_time.replace(minute=0)) |
|
206 | resource_id, current_time.replace(minute=0)) | |
203 | redis_pipeline.incr(key, total_reports) |
|
207 | redis_pipeline.incr(key, total_reports) | |
@@ -244,7 +248,8 b' def add_logs(resource_id, request_params, dataset, **kwargs):' | |||||
244 |
|
248 | |||
245 | try: |
|
249 | try: | |
246 | es_docs = collections.defaultdict(list) |
|
250 | es_docs = collections.defaultdict(list) | |
247 |
|
|
251 | resource = ApplicationService.by_id_cached()(resource_id) | |
|
252 | resource = DBSession.merge(resource, load=False) | |||
248 | ns_pairs = [] |
|
253 | ns_pairs = [] | |
249 | for entry in dataset: |
|
254 | for entry in dataset: | |
250 | # gather pk and ns so we can remove older versions of row later |
|
255 | # gather pk and ns so we can remove older versions of row later | |
@@ -252,9 +257,9 b' def add_logs(resource_id, request_params, dataset, **kwargs):' | |||||
252 | ns_pairs.append({"pk": entry['primary_key'], |
|
257 | ns_pairs.append({"pk": entry['primary_key'], | |
253 | "ns": entry['namespace']}) |
|
258 | "ns": entry['namespace']}) | |
254 | log_entry = Log() |
|
259 | log_entry = Log() | |
255 |
log_entry.set_data(entry, resource= |
|
260 | log_entry.set_data(entry, resource=resource) | |
256 | log_entry._skip_ft_index = True |
|
261 | log_entry._skip_ft_index = True | |
257 |
|
|
262 | resource.logs.append(log_entry) | |
258 | DBSession.flush() |
|
263 | DBSession.flush() | |
259 | # insert non pk rows first |
|
264 | # insert non pk rows first | |
260 | if entry['primary_key'] is None: |
|
265 | if entry['primary_key'] is None: | |
@@ -308,7 +313,7 b' def add_logs(resource_id, request_params, dataset, **kwargs):' | |||||
308 | total_logs = len(dataset) |
|
313 | total_logs = len(dataset) | |
309 |
|
314 | |||
310 | log_msg = 'LOG_NEW: %s, entries: %s, proto:%s' % ( |
|
315 | log_msg = 'LOG_NEW: %s, entries: %s, proto:%s' % ( | |
311 |
str( |
|
316 | str(resource), | |
312 | total_logs, |
|
317 | total_logs, | |
313 | proto_version) |
|
318 | proto_version) | |
314 | log.info(log_msg) |
|
319 | log.info(log_msg) | |
@@ -317,6 +322,10 b' def add_logs(resource_id, request_params, dataset, **kwargs):' | |||||
317 | key = REDIS_KEYS['counters']['logs_per_minute'].format(current_time) |
|
322 | key = REDIS_KEYS['counters']['logs_per_minute'].format(current_time) | |
318 | redis_pipeline.incr(key, total_logs) |
|
323 | redis_pipeline.incr(key, total_logs) | |
319 | redis_pipeline.expire(key, 3600 * 24) |
|
324 | redis_pipeline.expire(key, 3600 * 24) | |
|
325 | key = REDIS_KEYS['counters']['events_per_minute_per_user'].format( | |||
|
326 | resource.owner_user_id, current_time) | |||
|
327 | redis_pipeline.incr(key, total_logs) | |||
|
328 | redis_pipeline.expire(key, 3600) | |||
320 | key = REDIS_KEYS['counters']['logs_per_hour_per_app'].format( |
|
329 | key = REDIS_KEYS['counters']['logs_per_hour_per_app'].format( | |
321 | resource_id, current_time.replace(minute=0)) |
|
330 | resource_id, current_time.replace(minute=0)) | |
322 | redis_pipeline.incr(key, total_logs) |
|
331 | redis_pipeline.incr(key, total_logs) | |
@@ -342,8 +351,8 b' def add_logs_es(es_docs):' | |||||
342 | def add_metrics(resource_id, request_params, dataset, proto_version): |
|
351 | def add_metrics(resource_id, request_params, dataset, proto_version): | |
343 | current_time = datetime.utcnow().replace(second=0, microsecond=0) |
|
352 | current_time = datetime.utcnow().replace(second=0, microsecond=0) | |
344 | try: |
|
353 | try: | |
345 |
|
|
354 | resource = ApplicationService.by_id_cached()(resource_id) | |
346 |
|
|
355 | resource = DBSession.merge(resource, load=False) | |
347 | es_docs = [] |
|
356 | es_docs = [] | |
348 | rows = [] |
|
357 | rows = [] | |
349 | for metric in dataset: |
|
358 | for metric in dataset: | |
@@ -352,7 +361,7 b' def add_metrics(resource_id, request_params, dataset, proto_version):' | |||||
352 | tags['server_name'] = server_n or 'unknown' |
|
361 | tags['server_name'] = server_n or 'unknown' | |
353 | new_metric = Metric( |
|
362 | new_metric = Metric( | |
354 | timestamp=metric['timestamp'], |
|
363 | timestamp=metric['timestamp'], | |
355 |
resource_id= |
|
364 | resource_id=resource.resource_id, | |
356 | namespace=metric['namespace'], |
|
365 | namespace=metric['namespace'], | |
357 | tags=tags) |
|
366 | tags=tags) | |
358 | rows.append(new_metric) |
|
367 | rows.append(new_metric) | |
@@ -364,7 +373,7 b' def add_metrics(resource_id, request_params, dataset, proto_version):' | |||||
364 | action = 'METRICS' |
|
373 | action = 'METRICS' | |
365 | metrics_msg = '%s: %s, metrics: %s, proto:%s' % ( |
|
374 | metrics_msg = '%s: %s, metrics: %s, proto:%s' % ( | |
366 | action, |
|
375 | action, | |
367 |
str( |
|
376 | str(resource), | |
368 | len(dataset), |
|
377 | len(dataset), | |
369 | proto_version |
|
378 | proto_version | |
370 | ) |
|
379 | ) | |
@@ -375,6 +384,10 b' def add_metrics(resource_id, request_params, dataset, proto_version):' | |||||
375 | key = REDIS_KEYS['counters']['metrics_per_minute'].format(current_time) |
|
384 | key = REDIS_KEYS['counters']['metrics_per_minute'].format(current_time) | |
376 | redis_pipeline.incr(key, len(rows)) |
|
385 | redis_pipeline.incr(key, len(rows)) | |
377 | redis_pipeline.expire(key, 3600 * 24) |
|
386 | redis_pipeline.expire(key, 3600 * 24) | |
|
387 | key = REDIS_KEYS['counters']['events_per_minute_per_user'].format( | |||
|
388 | resource.owner_user_id, current_time) | |||
|
389 | redis_pipeline.incr(key, len(rows)) | |||
|
390 | redis_pipeline.expire(key, 3600) | |||
378 | key = REDIS_KEYS['counters']['metrics_per_hour_per_app'].format( |
|
391 | key = REDIS_KEYS['counters']['metrics_per_hour_per_app'].format( | |
379 | resource_id, current_time.replace(minute=0)) |
|
392 | resource_id, current_time.replace(minute=0)) | |
380 | redis_pipeline.incr(key, len(rows)) |
|
393 | redis_pipeline.incr(key, len(rows)) |
@@ -27,6 +27,8 b' REDIS_KEYS = {' | |||||
27 | 'add_logs_lock': BASE.format('add_logs_lock:{}'), |
|
27 | 'add_logs_lock': BASE.format('add_logs_lock:{}'), | |
28 | }, |
|
28 | }, | |
29 | 'counters': { |
|
29 | 'counters': { | |
|
30 | 'events_per_minute_per_user': BASE.format( | |||
|
31 | 'events_per_minute_per_user:{}:{}'), | |||
30 | 'reports_per_minute': BASE.format('reports_per_minute:{}'), |
|
32 | 'reports_per_minute': BASE.format('reports_per_minute:{}'), | |
31 | 'reports_per_hour_per_app': BASE.format( |
|
33 | 'reports_per_hour_per_app': BASE.format( | |
32 | 'reports_per_hour_per_app:{}:{}'), |
|
34 | 'reports_per_hour_per_app:{}:{}'), |
General Comments 0
You need to be logged in to leave comments.
Login now