##// END OF EJS Templates
reports: enforce more uniform distribution of reports between partitions
ergo -
r108:c5795d03 0.9.1
parent child Browse files
Show More
@@ -110,9 +110,11 b' def add_reports(resource_id, request_params, dataset, **kwargs):'
110 report.set_data(report_data, resource, proto_version)
110 report.set_data(report_data, resource, proto_version)
111 report._skip_ft_index = True
111 report._skip_ft_index = True
112
112
113 # find latest group in this months partition
113 report_group = ReportGroupService.by_hash_and_resource(
114 report_group = ReportGroupService.by_hash_and_resource(
114 report.resource_id,
115 report.resource_id,
115 report.grouping_hash
116 report.grouping_hash,
117 since_when=datetime.utcnow().date().replace(day=1)
116 )
118 )
117 occurences = report_data.get('occurences', 1)
119 occurences = report_data.get('occurences', 1)
118 if not report_group:
120 if not report_group:
@@ -19,7 +19,7 b''
19 # services, and proprietary license terms, please see
19 # services, and proprietary license terms, please see
20 # https://rhodecode.com/licenses/
20 # https://rhodecode.com/licenses/
21
21
22 from datetime import datetime
22 from datetime import datetime, timedelta
23 import math
23 import math
24 import uuid
24 import uuid
25 import hashlib
25 import hashlib
@@ -380,7 +380,7 b' class Report(Base, BaseModel):'
380 server_name = self.tags.get('server_name') or ''
380 server_name = self.tags.get('server_name') or ''
381 if default_grouping == 'url_traceback':
381 if default_grouping == 'url_traceback':
382 hash_string = '%s_%s_%s' % (self.traceback_hash, location,
382 hash_string = '%s_%s_%s' % (self.traceback_hash, location,
383 self.error)
383 self.error)
384 if self.language == Language.javascript:
384 if self.language == Language.javascript:
385 hash_string = '%s_%s' % (self.traceback_hash, self.error)
385 hash_string = '%s_%s' % (self.traceback_hash, self.error)
386
386
@@ -390,6 +390,8 b' class Report(Base, BaseModel):'
390 hash_string = '%s_%s' % (self.traceback_hash, server_name)
390 hash_string = '%s_%s' % (self.traceback_hash, server_name)
391 else:
391 else:
392 hash_string = '%s_%s' % (self.error, location)
392 hash_string = '%s_%s' % (self.error, location)
393 month = datetime.utcnow().date().replace(day=1)
394 hash_string = '{}_{}'.format(month, hash_string)
393 binary_string = hash_string.encode('utf8')
395 binary_string = hash_string.encode('utf8')
394 self.grouping_hash = hashlib.sha1(binary_string).hexdigest()
396 self.grouping_hash = hashlib.sha1(binary_string).hexdigest()
395 return self.grouping_hash
397 return self.grouping_hash
@@ -442,7 +444,7 b' class Report(Base, BaseModel):'
442
444
443 }
445 }
444 channelstream_request(settings['cometd.secret'], '/message', [payload],
446 channelstream_request(settings['cometd.secret'], '/message', [payload],
445 servers=[settings['cometd_servers']])
447 servers=[settings['cometd_servers']])
446
448
447 def es_doc(self):
449 def es_doc(self):
448 tags = {}
450 tags = {}
@@ -483,6 +485,12 b' class Report(Base, BaseModel):'
483 def partition_id(self):
485 def partition_id(self):
484 return 'rcae_r_%s' % self.report_group_time.strftime('%Y_%m')
486 return 'rcae_r_%s' % self.report_group_time.strftime('%Y_%m')
485
487
488 def partition_range(self):
489 start_date = self.report_group_time.date().replace(day=1)
490 end_date = start_date + timedelta(days=40)
491 end_date = end_date.replace(day=1)
492 return start_date, end_date
493
486
494
487 def after_insert(mapper, connection, target):
495 def after_insert(mapper, connection, target):
488 if not hasattr(target, '_skip_ft_index'):
496 if not hasattr(target, '_skip_ft_index'):
@@ -22,7 +22,7 b''
22 import logging
22 import logging
23 import sqlalchemy as sa
23 import sqlalchemy as sa
24
24
25 from datetime import datetime
25 from datetime import datetime, timedelta
26
26
27 from pyramid.threadlocal import get_current_request
27 from pyramid.threadlocal import get_current_request
28 from sqlalchemy.dialects.postgresql import JSON
28 from sqlalchemy.dialects.postgresql import JSON
@@ -209,7 +209,8 b' class ReportGroup(Base, BaseModel):'
209 key = REDIS_KEYS['counters']['report_group_occurences'].format(self.id)
209 key = REDIS_KEYS['counters']['report_group_occurences'].format(self.id)
210 redis_pipeline.incr(key)
210 redis_pipeline.incr(key)
211 redis_pipeline.expire(key, 3600 * 24)
211 redis_pipeline.expire(key, 3600 * 24)
212 key = REDIS_KEYS['counters']['report_group_occurences_alerting'].format(self.id)
212 key = REDIS_KEYS['counters']['report_group_occurences_alerting'].format(
213 self.id)
213 redis_pipeline.incr(key)
214 redis_pipeline.incr(key)
214 redis_pipeline.expire(key, 3600 * 24)
215 redis_pipeline.expire(key, 3600 * 24)
215
216
@@ -236,6 +237,12 b' class ReportGroup(Base, BaseModel):'
236 def partition_id(self):
237 def partition_id(self):
237 return 'rcae_r_%s' % self.first_timestamp.strftime('%Y_%m')
238 return 'rcae_r_%s' % self.first_timestamp.strftime('%Y_%m')
238
239
240 def partition_range(self):
241 start_date = self.first_timestamp.date().replace(day=1)
242 end_date = start_date + timedelta(days=40)
243 end_date = end_date.replace(day=1)
244 return start_date, end_date
245
239
246
240 def after_insert(mapper, connection, target):
247 def after_insert(mapper, connection, target):
241 if not hasattr(target, '_skip_ft_index'):
248 if not hasattr(target, '_skip_ft_index'):
@@ -256,7 +263,7 b' def after_update(mapper, connection, target):'
256 def after_delete(mapper, connection, target):
263 def after_delete(mapper, connection, target):
257 query = {'term': {'group_id': target.id}}
264 query = {'term': {'group_id': target.id}}
258 # TODO: routing seems unnecessary, need to test a bit more
265 # TODO: routing seems unnecessary, need to test a bit more
259 #Datastores.es.delete_by_query(target.partition_id, 'report', query,
266 # Datastores.es.delete_by_query(target.partition_id, 'report', query,
260 # query_params={'routing':str(target.id)})
267 # query_params={'routing':str(target.id)})
261 Datastores.es.delete_by_query(target.partition_id, 'report', query)
268 Datastores.es.delete_by_query(target.partition_id, 'report', query)
262 query = {'term': {'pg_id': target.id}}
269 query = {'term': {'pg_id': target.id}}
@@ -333,13 +333,15 b' class ReportGroupService(BaseService):'
333 return query
333 return query
334
334
335 @classmethod
335 @classmethod
336 def by_hash_and_resource(self, resource_id,
336 def by_hash_and_resource(cls, resource_id, grouping_hash, since_when=None,
337 grouping_hash, db_session=None):
337 db_session=None):
338 db_session = get_db_session(db_session)
338 db_session = get_db_session(db_session)
339 q = db_session.query(ReportGroup)
339 q = db_session.query(ReportGroup)
340 q = q.filter(ReportGroup.resource_id == resource_id)
340 q = q.filter(ReportGroup.resource_id == resource_id)
341 q = q.filter(ReportGroup.grouping_hash == grouping_hash)
341 q = q.filter(ReportGroup.grouping_hash == grouping_hash)
342 q = q.filter(ReportGroup.fixed == False)
342 q = q.filter(ReportGroup.fixed == False)
343 if since_when:
344 q = q.filter(ReportGroup.first_timestamp >= since_when)
343 return q.first()
345 return q.first()
344
346
345 @classmethod
347 @classmethod
General Comments 0
You need to be logged in to leave comments. Login now