##// END OF EJS Templates
tasks: better counters
ergo -
Show More
@@ -198,6 +198,10 b' def add_reports(resource_id, request_params, dataset, **kwargs):'
198 key = REDIS_KEYS['counters']['reports_per_minute'].format(current_time)
198 key = REDIS_KEYS['counters']['reports_per_minute'].format(current_time)
199 redis_pipeline.incr(key, total_reports)
199 redis_pipeline.incr(key, total_reports)
200 redis_pipeline.expire(key, 3600 * 24)
200 redis_pipeline.expire(key, 3600 * 24)
201 key = REDIS_KEYS['counters']['events_per_minute_per_user'].format(
202 resource.owner_user_id, current_time)
203 redis_pipeline.incr(key, total_reports)
204 redis_pipeline.expire(key, 3600)
201 key = REDIS_KEYS['counters']['reports_per_hour_per_app'].format(
205 key = REDIS_KEYS['counters']['reports_per_hour_per_app'].format(
202 resource_id, current_time.replace(minute=0))
206 resource_id, current_time.replace(minute=0))
203 redis_pipeline.incr(key, total_reports)
207 redis_pipeline.incr(key, total_reports)
@@ -244,7 +248,8 b' def add_logs(resource_id, request_params, dataset, **kwargs):'
244
248
245 try:
249 try:
246 es_docs = collections.defaultdict(list)
250 es_docs = collections.defaultdict(list)
247 application = ApplicationService.by_id(resource_id)
251 resource = ApplicationService.by_id_cached()(resource_id)
252 resource = DBSession.merge(resource, load=False)
248 ns_pairs = []
253 ns_pairs = []
249 for entry in dataset:
254 for entry in dataset:
250 # gather pk and ns so we can remove older versions of row later
255 # gather pk and ns so we can remove older versions of row later
@@ -252,9 +257,9 b' def add_logs(resource_id, request_params, dataset, **kwargs):'
252 ns_pairs.append({"pk": entry['primary_key'],
257 ns_pairs.append({"pk": entry['primary_key'],
253 "ns": entry['namespace']})
258 "ns": entry['namespace']})
254 log_entry = Log()
259 log_entry = Log()
255 log_entry.set_data(entry, resource=application)
260 log_entry.set_data(entry, resource=resource)
256 log_entry._skip_ft_index = True
261 log_entry._skip_ft_index = True
257 application.logs.append(log_entry)
262 resource.logs.append(log_entry)
258 DBSession.flush()
263 DBSession.flush()
259 # insert non pk rows first
264 # insert non pk rows first
260 if entry['primary_key'] is None:
265 if entry['primary_key'] is None:
@@ -308,7 +313,7 b' def add_logs(resource_id, request_params, dataset, **kwargs):'
308 total_logs = len(dataset)
313 total_logs = len(dataset)
309
314
310 log_msg = 'LOG_NEW: %s, entries: %s, proto:%s' % (
315 log_msg = 'LOG_NEW: %s, entries: %s, proto:%s' % (
311 str(application),
316 str(resource),
312 total_logs,
317 total_logs,
313 proto_version)
318 proto_version)
314 log.info(log_msg)
319 log.info(log_msg)
@@ -317,6 +322,10 b' def add_logs(resource_id, request_params, dataset, **kwargs):'
317 key = REDIS_KEYS['counters']['logs_per_minute'].format(current_time)
322 key = REDIS_KEYS['counters']['logs_per_minute'].format(current_time)
318 redis_pipeline.incr(key, total_logs)
323 redis_pipeline.incr(key, total_logs)
319 redis_pipeline.expire(key, 3600 * 24)
324 redis_pipeline.expire(key, 3600 * 24)
325 key = REDIS_KEYS['counters']['events_per_minute_per_user'].format(
326 resource.owner_user_id, current_time)
327 redis_pipeline.incr(key, total_logs)
328 redis_pipeline.expire(key, 3600)
320 key = REDIS_KEYS['counters']['logs_per_hour_per_app'].format(
329 key = REDIS_KEYS['counters']['logs_per_hour_per_app'].format(
321 resource_id, current_time.replace(minute=0))
330 resource_id, current_time.replace(minute=0))
322 redis_pipeline.incr(key, total_logs)
331 redis_pipeline.incr(key, total_logs)
@@ -342,8 +351,8 b' def add_logs_es(es_docs):'
342 def add_metrics(resource_id, request_params, dataset, proto_version):
351 def add_metrics(resource_id, request_params, dataset, proto_version):
343 current_time = datetime.utcnow().replace(second=0, microsecond=0)
352 current_time = datetime.utcnow().replace(second=0, microsecond=0)
344 try:
353 try:
345 application = ApplicationService.by_id_cached()(resource_id)
354 resource = ApplicationService.by_id_cached()(resource_id)
346 application = DBSession.merge(application, load=False)
355 resource = DBSession.merge(resource, load=False)
347 es_docs = []
356 es_docs = []
348 rows = []
357 rows = []
349 for metric in dataset:
358 for metric in dataset:
@@ -352,7 +361,7 b' def add_metrics(resource_id, request_params, dataset, proto_version):'
352 tags['server_name'] = server_n or 'unknown'
361 tags['server_name'] = server_n or 'unknown'
353 new_metric = Metric(
362 new_metric = Metric(
354 timestamp=metric['timestamp'],
363 timestamp=metric['timestamp'],
355 resource_id=application.resource_id,
364 resource_id=resource.resource_id,
356 namespace=metric['namespace'],
365 namespace=metric['namespace'],
357 tags=tags)
366 tags=tags)
358 rows.append(new_metric)
367 rows.append(new_metric)
@@ -364,7 +373,7 b' def add_metrics(resource_id, request_params, dataset, proto_version):'
364 action = 'METRICS'
373 action = 'METRICS'
365 metrics_msg = '%s: %s, metrics: %s, proto:%s' % (
374 metrics_msg = '%s: %s, metrics: %s, proto:%s' % (
366 action,
375 action,
367 str(application),
376 str(resource),
368 len(dataset),
377 len(dataset),
369 proto_version
378 proto_version
370 )
379 )
@@ -375,6 +384,10 b' def add_metrics(resource_id, request_params, dataset, proto_version):'
375 key = REDIS_KEYS['counters']['metrics_per_minute'].format(current_time)
384 key = REDIS_KEYS['counters']['metrics_per_minute'].format(current_time)
376 redis_pipeline.incr(key, len(rows))
385 redis_pipeline.incr(key, len(rows))
377 redis_pipeline.expire(key, 3600 * 24)
386 redis_pipeline.expire(key, 3600 * 24)
387 key = REDIS_KEYS['counters']['events_per_minute_per_user'].format(
388 resource.owner_user_id, current_time)
389 redis_pipeline.incr(key, len(rows))
390 redis_pipeline.expire(key, 3600)
378 key = REDIS_KEYS['counters']['metrics_per_hour_per_app'].format(
391 key = REDIS_KEYS['counters']['metrics_per_hour_per_app'].format(
379 resource_id, current_time.replace(minute=0))
392 resource_id, current_time.replace(minute=0))
380 redis_pipeline.incr(key, len(rows))
393 redis_pipeline.incr(key, len(rows))
@@ -27,6 +27,8 b' REDIS_KEYS = {'
27 'add_logs_lock': BASE.format('add_logs_lock:{}'),
27 'add_logs_lock': BASE.format('add_logs_lock:{}'),
28 },
28 },
29 'counters': {
29 'counters': {
30 'events_per_minute_per_user': BASE.format(
31 'events_per_minute_per_user:{}:{}'),
30 'reports_per_minute': BASE.format('reports_per_minute:{}'),
32 'reports_per_minute': BASE.format('reports_per_minute:{}'),
31 'reports_per_hour_per_app': BASE.format(
33 'reports_per_hour_per_app': BASE.format(
32 'reports_per_hour_per_app:{}:{}'),
34 'reports_per_hour_per_app:{}:{}'),
General Comments 0
You need to be logged in to leave comments. Login now