From c5795d037536bd5c300d559be62a263c11ca00a0 2017-02-03 14:15:45 From: Marcin Lulek Date: 2017-02-03 14:15:45 Subject: [PATCH] reports: enforce more uniform distribution of reports between partitions --- diff --git a/backend/src/appenlight/celery/tasks.py b/backend/src/appenlight/celery/tasks.py index be24e74..07b949a 100644 --- a/backend/src/appenlight/celery/tasks.py +++ b/backend/src/appenlight/celery/tasks.py @@ -110,9 +110,11 @@ def add_reports(resource_id, request_params, dataset, **kwargs): report.set_data(report_data, resource, proto_version) report._skip_ft_index = True + # find latest group in this months partition report_group = ReportGroupService.by_hash_and_resource( report.resource_id, - report.grouping_hash + report.grouping_hash, + since_when=datetime.utcnow().date().replace(day=1) ) occurences = report_data.get('occurences', 1) if not report_group: diff --git a/backend/src/appenlight/models/report.py b/backend/src/appenlight/models/report.py index a231d4a..d058e05 100644 --- a/backend/src/appenlight/models/report.py +++ b/backend/src/appenlight/models/report.py @@ -19,7 +19,7 @@ # services, and proprietary license terms, please see # https://rhodecode.com/licenses/ -from datetime import datetime +from datetime import datetime, timedelta import math import uuid import hashlib @@ -380,7 +380,7 @@ class Report(Base, BaseModel): server_name = self.tags.get('server_name') or '' if default_grouping == 'url_traceback': hash_string = '%s_%s_%s' % (self.traceback_hash, location, - self.error) + self.error) if self.language == Language.javascript: hash_string = '%s_%s' % (self.traceback_hash, self.error) @@ -390,6 +390,8 @@ class Report(Base, BaseModel): hash_string = '%s_%s' % (self.traceback_hash, server_name) else: hash_string = '%s_%s' % (self.error, location) + month = datetime.utcnow().date().replace(day=1) + hash_string = '{}_{}'.format(month, hash_string) binary_string = hash_string.encode('utf8') self.grouping_hash = hashlib.sha1(binary_string).hexdigest() return self.grouping_hash @@ -442,7 +444,7 @@ class Report(Base, BaseModel): } channelstream_request(settings['cometd.secret'], '/message', [payload], - servers=[settings['cometd_servers']]) + servers=[settings['cometd_servers']]) def es_doc(self): tags = {} @@ -483,6 +485,12 @@ class Report(Base, BaseModel): def partition_id(self): return 'rcae_r_%s' % self.report_group_time.strftime('%Y_%m') + def partition_range(self): + start_date = self.report_group_time.date().replace(day=1) + end_date = start_date + timedelta(days=40) + end_date = end_date.replace(day=1) + return start_date, end_date + def after_insert(mapper, connection, target): if not hasattr(target, '_skip_ft_index'): diff --git a/backend/src/appenlight/models/report_group.py b/backend/src/appenlight/models/report_group.py index 1254899..9c80317 100644 --- a/backend/src/appenlight/models/report_group.py +++ b/backend/src/appenlight/models/report_group.py @@ -22,7 +22,7 @@ import logging import sqlalchemy as sa -from datetime import datetime +from datetime import datetime, timedelta from pyramid.threadlocal import get_current_request from sqlalchemy.dialects.postgresql import JSON @@ -209,7 +209,8 @@ class ReportGroup(Base, BaseModel): key = REDIS_KEYS['counters']['report_group_occurences'].format(self.id) redis_pipeline.incr(key) redis_pipeline.expire(key, 3600 * 24) - key = REDIS_KEYS['counters']['report_group_occurences_alerting'].format(self.id) + key = REDIS_KEYS['counters']['report_group_occurences_alerting'].format( + self.id) redis_pipeline.incr(key) redis_pipeline.expire(key, 3600 * 24) @@ -236,6 +237,12 @@ class ReportGroup(Base, BaseModel): def partition_id(self): return 'rcae_r_%s' % self.first_timestamp.strftime('%Y_%m') + def partition_range(self): + start_date = self.first_timestamp.date().replace(day=1) + end_date = start_date + timedelta(days=40) + end_date = end_date.replace(day=1) + return start_date, end_date + def after_insert(mapper, connection, target): if not hasattr(target, '_skip_ft_index'): @@ -256,7 +263,7 @@ def after_update(mapper, connection, target): def after_delete(mapper, connection, target): query = {'term': {'group_id': target.id}} # TODO: routing seems unnecessary, need to test a bit more - #Datastores.es.delete_by_query(target.partition_id, 'report', query, + # Datastores.es.delete_by_query(target.partition_id, 'report', query, # query_params={'routing':str(target.id)}) Datastores.es.delete_by_query(target.partition_id, 'report', query) query = {'term': {'pg_id': target.id}} diff --git a/backend/src/appenlight/models/services/report_group.py b/backend/src/appenlight/models/services/report_group.py index 4eb3137..50007e7 100644 --- a/backend/src/appenlight/models/services/report_group.py +++ b/backend/src/appenlight/models/services/report_group.py @@ -333,13 +333,15 @@ class ReportGroupService(BaseService): return query @classmethod - def by_hash_and_resource(self, resource_id, - grouping_hash, db_session=None): + def by_hash_and_resource(cls, resource_id, grouping_hash, since_when=None, + db_session=None): db_session = get_db_session(db_session) q = db_session.query(ReportGroup) q = q.filter(ReportGroup.resource_id == resource_id) q = q.filter(ReportGroup.grouping_hash == grouping_hash) q = q.filter(ReportGroup.fixed == False) + if since_when: + q = q.filter(ReportGroup.first_timestamp >= since_when) return q.first() @classmethod