# -*- coding: utf-8 -*- # Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import logging import uuid from datetime import datetime log = logging.getLogger(__name__) def parse_airbrake_xml(request): root = request.context.airbrake_xml_etree error = root.find("error") notifier = root.find("notifier") server_env = root.find("server-environment") request_data = root.find("request") user = root.find("current-user") if request_data is not None: cgi_data = request_data.find("cgi-data") if cgi_data is None: cgi_data = [] error_dict = { "class_name": error.findtext("class") or "", "error": error.findtext("message") or "", "occurences": 1, "http_status": 500, "priority": 5, "server": "unknown", "url": "unknown", "request": {}, } if user is not None: error_dict["username"] = user.findtext("username") or user.findtext("id") if notifier is not None: error_dict["client"] = notifier.findtext("name") if server_env is not None: error_dict["server"] = server_env.findtext("hostname", "unknown") whitelist_environ = [ "REMOTE_USER", "REMOTE_ADDR", "SERVER_NAME", "CONTENT_TYPE", "HTTP_REFERER", ] if request_data is not None: error_dict["url"] = request_data.findtext("url", "unknown") component = request_data.findtext("component") action = request_data.findtext("action") if component and action: error_dict["view_name"] = "%s:%s" % (component, action) for node in cgi_data: key = node.get("key") if key.startswith("HTTP") or key in whitelist_environ: error_dict["request"][key] = node.text elif "query_parameters" in key: error_dict["request"]["GET"] = {} for x in node: error_dict["request"]["GET"][x.get("key")] = x.text elif "request_parameters" in key: error_dict["request"]["POST"] = {} for x in node: error_dict["request"]["POST"][x.get("key")] = x.text elif key.endswith("cookie"): error_dict["request"]["COOKIE"] = {} for x in node: error_dict["request"]["COOKIE"][x.get("key")] = x.text elif key.endswith("request_id"): error_dict["request_id"] = node.text elif key.endswith("session"): error_dict["request"]["SESSION"] = {} for x in node: error_dict["request"]["SESSION"][x.get("key")] = x.text else: if key in ["rack.session.options"]: # skip secret configs continue try: if len(node): error_dict["request"][key] = dict( [(x.get("key"), x.text) for x in node] ) else: error_dict["request"][key] = node.text except Exception as e: log.warning("Airbrake integration exception: %s" % e) error_dict["request"].pop("HTTP_COOKIE", "") error_dict["ip"] = error_dict.pop("REMOTE_ADDR", "") error_dict["user_agent"] = error_dict.pop("HTTP_USER_AGENT", "") if "request_id" not in error_dict: error_dict["request_id"] = str(uuid.uuid4()) if request.context.possibly_public: # set ip for reports that come from airbrake js client error_dict["timestamp"] = datetime.utcnow() if request.environ.get("HTTP_X_FORWARDED_FOR"): ip = request.environ.get("HTTP_X_FORWARDED_FOR", "") first_ip = ip.split(",")[0] remote_addr = first_ip.strip() else: remote_addr = request.environ.get("HTTP_X_REAL_IP") or request.environ.get( "REMOTE_ADDR" ) error_dict["ip"] = remote_addr blacklist = [ "password", "passwd", "pwd", "auth_tkt", "secret", "csrf", "session", "test", ] lines = [] for l in error.find("backtrace"): lines.append( { "file": l.get("file", ""), "line": l.get("number", ""), "fn": l.get("method", ""), "module": l.get("module", ""), "cline": l.get("method", ""), "vars": {}, } ) error_dict["traceback"] = list(reversed(lines)) # filtering is not provided by airbrake keys_to_check = ( error_dict["request"].get("COOKIE"), error_dict["request"].get("COOKIES"), error_dict["request"].get("POST"), error_dict["request"].get("SESSION"), ) for source in [_f for _f in keys_to_check if _f]: for k in source.keys(): for bad_key in blacklist: if bad_key in k.lower(): source[k] = "***" return error_dict