##// END OF EJS Templates
project: add missing lib directory
project: add missing lib directory

File last commit:

r2:87908169
r2:87908169
Show More
airbrake.py
147 lines | 5.7 KiB | text/x-python | PythonLexer
project: add missing lib directory
r2 # -*- coding: utf-8 -*-
# Copyright (C) 2010-2016 RhodeCode GmbH
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License, version 3
# (only), as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# This program is dual-licensed. If you wish to learn more about the
# App Enlight Enterprise Edition, including its added features, Support
# services, and proprietary license terms, please see
# https://rhodecode.com/licenses/
import logging
import uuid
from datetime import datetime
log = logging.getLogger(__name__)
def parse_airbrake_xml(request):
root = request.context.airbrake_xml_etree
error = root.find('error')
notifier = root.find('notifier')
server_env = root.find('server-environment')
request_data = root.find('request')
user = root.find('current-user')
if request_data is not None:
cgi_data = request_data.find('cgi-data')
if cgi_data is None:
cgi_data = []
error_dict = {
'class_name': error.findtext('class') or '',
'error': error.findtext('message') or '',
"occurences": 1,
"http_status": 500,
"priority": 5,
"server": 'unknown',
'url': 'unknown', 'request': {}
}
if user is not None:
error_dict['username'] = user.findtext('username') or \
user.findtext('id')
if notifier is not None:
error_dict['client'] = notifier.findtext('name')
if server_env is not None:
error_dict["server"] = server_env.findtext('hostname', 'unknown')
whitelist_environ = ['REMOTE_USER', 'REMOTE_ADDR', 'SERVER_NAME',
'CONTENT_TYPE', 'HTTP_REFERER']
if request_data is not None:
error_dict['url'] = request_data.findtext('url', 'unknown')
component = request_data.findtext('component')
action = request_data.findtext('action')
if component and action:
error_dict['view_name'] = '%s:%s' % (component, action)
for node in cgi_data:
key = node.get('key')
if key.startswith('HTTP') or key in whitelist_environ:
error_dict['request'][key] = node.text
elif 'query_parameters' in key:
error_dict['request']['GET'] = {}
for x in node:
error_dict['request']['GET'][x.get('key')] = x.text
elif 'request_parameters' in key:
error_dict['request']['POST'] = {}
for x in node:
error_dict['request']['POST'][x.get('key')] = x.text
elif key.endswith('cookie'):
error_dict['request']['COOKIE'] = {}
for x in node:
error_dict['request']['COOKIE'][x.get('key')] = x.text
elif key.endswith('request_id'):
error_dict['request_id'] = node.text
elif key.endswith('session'):
error_dict['request']['SESSION'] = {}
for x in node:
error_dict['request']['SESSION'][x.get('key')] = x.text
else:
if key in ['rack.session.options']:
# skip secret configs
continue
try:
if len(node):
error_dict['request'][key] = dict(
[(x.get('key'), x.text,) for x in node])
else:
error_dict['request'][key] = node.text
except Exception as e:
log.warning('Airbrake integration exception: %s' % e)
error_dict['request'].pop('HTTP_COOKIE', '')
error_dict['ip'] = error_dict.pop('REMOTE_ADDR', '')
error_dict['user_agent'] = error_dict.pop('HTTP_USER_AGENT', '')
if 'request_id' not in error_dict:
error_dict['request_id'] = str(uuid.uuid4())
if request.context.possibly_public:
# set ip for reports that come from airbrake js client
error_dict["timestamp"] = datetime.utcnow()
if request.environ.get("HTTP_X_FORWARDED_FOR"):
ip = request.environ.get("HTTP_X_FORWARDED_FOR", '')
first_ip = ip.split(',')[0]
remote_addr = first_ip.strip()
else:
remote_addr = (request.environ.get("HTTP_X_REAL_IP") or
request.environ.get('REMOTE_ADDR'))
error_dict["ip"] = remote_addr
blacklist = ['password', 'passwd', 'pwd', 'auth_tkt', 'secret', 'csrf',
'session', 'test']
lines = []
for l in error.find('backtrace'):
lines.append({'file': l.get("file", ""),
'line': l.get("number", ""),
'fn': l.get("method", ""),
'module': l.get("module", ""),
'cline': l.get("method", ""),
'vars': {}})
error_dict['traceback'] = list(reversed(lines))
# filtering is not provided by airbrake
keys_to_check = (
error_dict['request'].get('COOKIE'),
error_dict['request'].get('COOKIES'),
error_dict['request'].get('POST'),
error_dict['request'].get('SESSION'),
)
for source in [_f for _f in keys_to_check if _f]:
for k in source.keys():
for bad_key in blacklist:
if bad_key in k.lower():
source[k] = '***'
return error_dict