# -*- coding: utf-8 -*- # Copyright (C) 2010-2016 RhodeCode GmbH # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License, version 3 # (only), as published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # # This program is dual-licensed. If you wish to learn more about the # AppEnlight Enterprise Edition, including its added features, Support # services, and proprietary license terms, please see # https://rhodecode.com/licenses/ from datetime import datetime import math import uuid import hashlib import copy import urllib.parse import logging import sqlalchemy as sa from appenlight.models import Base, Datastores from appenlight.lib.utils.date_utils import convert_date from appenlight.lib.utils import convert_es_type from appenlight.models.slow_call import SlowCall from appenlight.lib.utils import channelstream_request from appenlight.lib.enums import ReportType, Language from pyramid.threadlocal import get_current_registry, get_current_request from sqlalchemy.dialects.postgresql import JSON from ziggurat_foundations.models.base import BaseModel log = logging.getLogger(__name__) REPORT_TYPE_MATRIX = { 'http_status': {"type": 'int', "ops": ('eq', 'ne', 'ge', 'le',)}, 'group:priority': {"type": 'int', "ops": ('eq', 'ne', 'ge', 'le',)}, 'duration': {"type": 'float', "ops": ('ge', 'le',)}, 'url_domain': {"type": 'unicode', "ops": ('eq', 'ne', 'startswith', 'endswith', 'contains',)}, 'url_path': {"type": 'unicode', "ops": ('eq', 'ne', 'startswith', 'endswith', 'contains',)}, 'error': {"type": 'unicode', "ops": ('eq', 'ne', 'startswith', 'endswith', 'contains',)}, 'tags:server_name': {"type": 'unicode', "ops": ('eq', 'ne', 'startswith', 'endswith', 'contains',)}, 'traceback': {"type": 'unicode', "ops": ('contains',)}, 'group:occurences': {"type": 'int', "ops": ('eq', 'ne', 'ge', 'le',)} } class Report(Base, BaseModel): __tablename__ = 'reports' __table_args__ = {'implicit_returning': False} id = sa.Column(sa.Integer, nullable=False, primary_key=True) group_id = sa.Column(sa.BigInteger, sa.ForeignKey('reports_groups.id', ondelete='cascade', onupdate='cascade')) resource_id = sa.Column(sa.Integer(), nullable=False, index=True) report_type = sa.Column(sa.Integer(), nullable=False, index=True) error = sa.Column(sa.UnicodeText(), index=True) extra = sa.Column(JSON(), default={}) request = sa.Column(JSON(), nullable=False, default={}) ip = sa.Column(sa.String(39), index=True, default='') username = sa.Column(sa.Unicode(255), default='') user_agent = sa.Column(sa.Unicode(255), default='') url = sa.Column(sa.UnicodeText(), index=True) request_id = sa.Column(sa.Text()) request_stats = sa.Column(JSON(), nullable=False, default={}) traceback = sa.Column(JSON(), nullable=False, default=None) traceback_hash = sa.Column(sa.Text()) start_time = sa.Column(sa.DateTime(), default=datetime.utcnow, server_default=sa.func.now()) end_time = sa.Column(sa.DateTime()) duration = sa.Column(sa.Float, default=0) http_status = sa.Column(sa.Integer, index=True) url_domain = sa.Column(sa.Unicode(100), index=True) url_path = sa.Column(sa.Unicode(255), index=True) tags = sa.Column(JSON(), nullable=False, default={}) language = sa.Column(sa.Integer(), default=0) # this is used to determine partition for the report report_group_time = sa.Column(sa.DateTime(), default=datetime.utcnow, server_default=sa.func.now()) logs = sa.orm.relationship( 'Log', lazy='dynamic', passive_deletes=True, passive_updates=True, primaryjoin="and_(Report.request_id==Log.request_id, " "Log.request_id != None, Log.request_id != '')", foreign_keys='[Log.request_id]') slow_calls = sa.orm.relationship('SlowCall', backref='detail', cascade="all, delete-orphan", passive_deletes=True, passive_updates=True, order_by='SlowCall.timestamp') def set_data(self, data, resource, protocol_version=None): self.http_status = data['http_status'] self.priority = data['priority'] self.error = data['error'] report_language = data.get('language', '').lower() self.language = getattr(Language, report_language, Language.unknown) # we need temp holder here to decide later # if we want to to commit the tags if report is marked for creation self.tags = { 'server_name': data['server'], 'view_name': data['view_name'] } if data.get('tags'): for tag_tuple in data['tags']: self.tags[tag_tuple[0]] = tag_tuple[1] self.traceback = data['traceback'] stripped_traceback = self.stripped_traceback() tb_repr = repr(stripped_traceback).encode('utf8') self.traceback_hash = hashlib.sha1(tb_repr).hexdigest() url_info = urllib.parse.urlsplit( data.get('url', ''), allow_fragments=False) self.url_domain = url_info.netloc[:128] self.url_path = url_info.path[:2048] self.occurences = data['occurences'] if self.error: self.report_type = ReportType.error else: self.report_type = ReportType.slow # but if its status 404 its 404 type if self.http_status in [404, '404'] or self.error == '404 Not Found': self.report_type = ReportType.not_found self.error = '' self.generate_grouping_hash(data.get('appenlight.group_string', data.get('group_string')), resource.default_grouping, protocol_version) # details if data['http_status'] in [404, '404']: data = {"username": data["username"], "ip": data["ip"], "url": data["url"], "user_agent": data["user_agent"]} if data.get('HTTP_REFERER') or data.get('http_referer'): data['HTTP_REFERER'] = data.get( 'HTTP_REFERER', '') or data.get('http_referer', '') self.resource_id = resource.resource_id self.username = data['username'] self.user_agent = data['user_agent'] self.ip = data['ip'] self.extra = {} if data.get('extra'): for extra_tuple in data['extra']: self.extra[extra_tuple[0]] = extra_tuple[1] self.url = data['url'] self.request_id = data.get('request_id', '').replace('-', '') or str( uuid.uuid4()) request_data = data.get('request', {}) self.request = request_data self.request_stats = data.get('request_stats', {}) traceback = data.get('traceback') if not traceback: traceback = data.get('frameinfo') self.traceback = traceback start_date = convert_date(data.get('start_time')) if not self.start_time or self.start_time < start_date: self.start_time = start_date self.end_time = convert_date(data.get('end_time'), False) self.duration = 0 if self.start_time and self.end_time: d = self.end_time - self.start_time self.duration = d.total_seconds() # update tags with other vars if self.username: self.tags['user_name'] = self.username self.tags['report_language'] = Language.key_from_value(self.language) def add_slow_calls(self, data, report_group): slow_calls = [] for call in data.get('slow_calls', []): sc_inst = SlowCall() sc_inst.set_data(call, resource_id=self.resource_id, report_group=report_group) slow_calls.append(sc_inst) self.slow_calls.extend(slow_calls) return slow_calls def get_dict(self, request, details=False, exclude_keys=None, include_keys=None): from appenlight.models.services.report_group import ReportGroupService instance_dict = super(Report, self).get_dict() instance_dict['req_stats'] = self.req_stats() instance_dict['group'] = {} instance_dict['group']['id'] = self.report_group.id instance_dict['group'][ 'total_reports'] = self.report_group.total_reports instance_dict['group']['last_report'] = self.report_group.last_report instance_dict['group']['priority'] = self.report_group.priority instance_dict['group']['occurences'] = self.report_group.occurences instance_dict['group'][ 'last_timestamp'] = self.report_group.last_timestamp instance_dict['group'][ 'first_timestamp'] = self.report_group.first_timestamp instance_dict['group']['public'] = self.report_group.public instance_dict['group']['fixed'] = self.report_group.fixed instance_dict['group']['read'] = self.report_group.read instance_dict['group'][ 'average_duration'] = self.report_group.average_duration instance_dict[ 'resource_name'] = self.report_group.application.resource_name instance_dict['report_type'] = self.report_type if instance_dict['http_status'] == 404 and not instance_dict['error']: instance_dict['error'] = '404 Not Found' if details: instance_dict['affected_users_count'] = \ ReportGroupService.affected_users_count(self.report_group) instance_dict['top_affected_users'] = [ {'username': u.username, 'count': u.count} for u in ReportGroupService.top_affected_users(self.report_group)] instance_dict['application'] = {'integrations': []} for integration in self.report_group.application.integrations: if integration.front_visible: instance_dict['application']['integrations'].append( {'name': integration.integration_name, 'action': integration.integration_action}) instance_dict['comments'] = [c.get_dict() for c in self.report_group.comments] instance_dict['group']['next_report'] = None instance_dict['group']['previous_report'] = None next_in_group = self.get_next_in_group(request) previous_in_group = self.get_previous_in_group(request) if next_in_group: instance_dict['group']['next_report'] = next_in_group if previous_in_group: instance_dict['group']['previous_report'] = previous_in_group # slow call ordering def find_parent(row, data): for r in reversed(data): try: if (row['timestamp'] > r['timestamp'] and row['end_time'] < r['end_time']): return r except TypeError as e: log.warning('reports_view.find_parent: %s' % e) return None new_calls = [] calls = [c.get_dict() for c in self.slow_calls] while calls: # start from end for x in range(len(calls) - 1, -1, -1): parent = find_parent(calls[x], calls) if parent: parent['children'].append(calls[x]) else: # no parent at all? append to new calls anyways new_calls.append(calls[x]) # print 'append', calls[x] del calls[x] break instance_dict['slow_calls'] = new_calls instance_dict['front_url'] = self.get_public_url(request) exclude_keys_list = exclude_keys or [] include_keys_list = include_keys or [] for k in list(instance_dict.keys()): if k == 'group': continue if (k in exclude_keys_list or (k not in include_keys_list and include_keys)): del instance_dict[k] return instance_dict def get_previous_in_group(self, request): query = { "size": 1, "query": { "filtered": { "filter": { "and": [{"term": {"group_id": self.group_id}}, {"range": {"pg_id": {"lt": self.id}}}] } } }, "sort": [ {"_doc": {"order": "desc"}}, ], } result = request.es_conn.search(query, index=self.partition_id, doc_type='report') if result['hits']['total']: return result['hits']['hits'][0]['_source']['pg_id'] def get_next_in_group(self, request): query = { "size": 1, "query": { "filtered": { "filter": { "and": [{"term": {"group_id": self.group_id}}, {"range": {"pg_id": {"gt": self.id}}}] } } }, "sort": [ {"_doc": {"order": "asc"}}, ], } result = request.es_conn.search(query, index=self.partition_id, doc_type='report') if result['hits']['total']: return result['hits']['hits'][0]['_source']['pg_id'] def get_public_url(self, request=None, report_group=None, _app_url=None): """ Returns url that user can use to visit specific report """ if not request: request = get_current_request() url = request.route_url('/', _app_url=_app_url) if report_group: return (url + 'ui/report/%s/%s') % (report_group.id, self.id) return (url + 'ui/report/%s/%s') % (self.group_id, self.id) def req_stats(self): stats = self.request_stats.copy() stats['percentages'] = {} stats['percentages']['main'] = 100.0 main = stats.get('main', 0.0) if not main: return None for name, call_time in stats.items(): if ('calls' not in name and 'main' not in name and 'percentages' not in name): stats['main'] -= call_time stats['percentages'][name] = math.floor( (call_time / main * 100.0)) stats['percentages']['main'] -= stats['percentages'][name] if stats['percentages']['main'] < 0.0: stats['percentages']['main'] = 0.0 stats['main'] = 0.0 return stats def generate_grouping_hash(self, hash_string=None, default_grouping=None, protocol_version=None): """ Generates SHA1 hash that will be used to group reports together """ if not hash_string: location = self.tags.get('view_name') or self.url_path; server_name = self.tags.get('server_name') or '' if default_grouping == 'url_traceback': hash_string = '%s_%s_%s' % (self.traceback_hash, location, self.error) if self.language == Language.javascript: hash_string = '%s_%s' % (self.traceback_hash, self.error) elif default_grouping == 'traceback_server': hash_string = '%s_%s' % (self.traceback_hash, server_name) if self.language == Language.javascript: hash_string = '%s_%s' % (self.traceback_hash, server_name) else: hash_string = '%s_%s' % (self.error, location) binary_string = hash_string.encode('utf8') self.grouping_hash = hashlib.sha1(binary_string).hexdigest() return self.grouping_hash def stripped_traceback(self): """ Traceback without local vars """ stripped_traceback = copy.deepcopy(self.traceback) if isinstance(stripped_traceback, list): for row in stripped_traceback: row.pop('vars', None) return stripped_traceback def notify_channel(self, report_group): """ Sends notification to websocket channel """ settings = get_current_registry().settings log.info('notify channelstream') if self.report_type != ReportType.error: return payload = { 'type': 'message', "user": '__system__', "channel": 'app_%s' % self.resource_id, 'message': { 'topic': 'front_dashboard.new_topic', 'report': { 'group': { 'priority': report_group.priority, 'first_timestamp': report_group.first_timestamp, 'last_timestamp': report_group.last_timestamp, 'average_duration': report_group.average_duration, 'occurences': report_group.occurences }, 'report_id': self.id, 'group_id': self.group_id, 'resource_id': self.resource_id, 'http_status': self.http_status, 'url_domain': self.url_domain, 'url_path': self.url_path, 'error': self.error or '', 'server': self.tags.get('server_name'), 'view_name': self.tags.get('view_name'), 'front_url': self.get_public_url(), } } } channelstream_request(settings['cometd.secret'], '/message', [payload], servers=[settings['cometd_servers']]) def es_doc(self): tags = {} tag_list = [] for name, value in self.tags.items(): name = name.replace('.', '_') tag_list.append(name) tags[name] = { "values": convert_es_type(value), "numeric_values": value if ( isinstance(value, (int, float)) and not isinstance(value, bool)) else None} if 'user_name' not in self.tags and self.username: tags["user_name"] = {"value": [self.username], "numeric_value": None} return { '_id': str(self.id), 'pg_id': str(self.id), 'resource_id': self.resource_id, 'http_status': self.http_status or '', 'start_time': self.start_time, 'end_time': self.end_time, 'url_domain': self.url_domain if self.url_domain else '', 'url_path': self.url_path if self.url_path else '', 'duration': self.duration, 'error': self.error if self.error else '', 'report_type': self.report_type, 'request_id': self.request_id, 'ip': self.ip, 'group_id': str(self.group_id), '_parent': str(self.group_id), 'tags': tags, 'tag_list': tag_list } @property def partition_id(self): return 'rcae_r_%s' % self.report_group_time.strftime('%Y_%m') def after_insert(mapper, connection, target): if not hasattr(target, '_skip_ft_index'): data = target.es_doc() data.pop('_id', None) Datastores.es.index(target.partition_id, 'report', data, parent=target.group_id, id=target.id) def after_update(mapper, connection, target): if not hasattr(target, '_skip_ft_index'): data = target.es_doc() data.pop('_id', None) Datastores.es.index(target.partition_id, 'report', data, parent=target.group_id, id=target.id) def after_delete(mapper, connection, target): if not hasattr(target, '_skip_ft_index'): query = {'term': {'pg_id': target.id}} Datastores.es.delete_by_query(target.partition_id, 'report', query) sa.event.listen(Report, 'after_insert', after_insert) sa.event.listen(Report, 'after_update', after_update) sa.event.listen(Report, 'after_delete', after_delete)