# -*- coding: utf-8 -*- # Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import base64 import io import datetime import json import logging import urllib.request, urllib.parse, urllib.error import zlib from gzip import GzipFile from pyramid.view import view_config from pyramid.httpexceptions import HTTPBadRequest import appenlight.celery.tasks as tasks from appenlight.lib.api import rate_limiting, check_cors from appenlight.lib.enums import ParsedSentryEventType from appenlight.lib.utils import parse_proto from appenlight.lib.utils.airbrake import parse_airbrake_xml from appenlight.lib.utils.date_utils import convert_date from appenlight.lib.utils.sentry import parse_sentry_event from appenlight.lib.request import JSONException from appenlight.validators import ( LogListSchema, MetricsListSchema, GeneralMetricsListSchema, GeneralMetricsPermanentListSchema, GeneralMetricSchema, GeneralMetricPermanentSchema, LogListPermanentSchema, ReportListSchema_0_5, LogSchema, LogSchemaPermanent, ReportSchema_0_5, ) log = logging.getLogger(__name__) @view_config( route_name="api_logs", renderer="string", permission="create", require_csrf=False ) @view_config( route_name="api_log", renderer="string", permission="create", require_csrf=False ) def logs_create(request): """ Endpoint for log aggregation """ application = request.context.resource if request.method.upper() == "OPTIONS": return check_cors(request, application) else: check_cors(request, application, should_return=False) params = dict(request.params.copy()) proto_version = parse_proto(params.get("protocol_version", "")) payload = request.unsafe_json_body sequence_accepted = request.matched_route.name == "api_logs" if sequence_accepted: if application.allow_permanent_storage: schema = LogListPermanentSchema().bind(utcnow=datetime.datetime.utcnow()) else: schema = LogListSchema().bind(utcnow=datetime.datetime.utcnow()) else: if application.allow_permanent_storage: schema = LogSchemaPermanent().bind(utcnow=datetime.datetime.utcnow()) else: schema = LogSchema().bind(utcnow=datetime.datetime.utcnow()) deserialized_logs = schema.deserialize(payload) if sequence_accepted is False: deserialized_logs = [deserialized_logs] rate_limiting( request, application, "per_application_logs_rate_limit", len(deserialized_logs) ) # pprint.pprint(deserialized_logs) # we need to split those out so we can process the pkey ones one by one non_pkey_logs = [ log_dict for log_dict in deserialized_logs if not log_dict["primary_key"] ] pkey_dict = {} # try to process the logs as best as we can and group together to reduce # the amount of for log_dict in deserialized_logs: if log_dict["primary_key"]: key = (log_dict["primary_key"], log_dict["namespace"]) if not key in pkey_dict: pkey_dict[key] = [] pkey_dict[key].append(log_dict) if non_pkey_logs: log.debug("%s non-pkey logs received: %s" % (application, len(non_pkey_logs))) tasks.add_logs.delay(application.resource_id, params, non_pkey_logs) if pkey_dict: logs_to_insert = [] for primary_key_tuple, payload in pkey_dict.items(): sorted_logs = sorted(payload, key=lambda x: x["date"]) logs_to_insert.append(sorted_logs[-1]) log.debug("%s pkey logs received: %s" % (application, len(logs_to_insert))) tasks.add_logs.delay(application.resource_id, params, logs_to_insert) log.info( "LOG call %s %s client:%s" % (application, proto_version, request.headers.get("user_agent")) ) return "OK: Logs accepted" @view_config( route_name="api_request_stats", renderer="string", permission="create", require_csrf=False, ) @view_config( route_name="api_metrics", renderer="string", permission="create", require_csrf=False ) def request_metrics_create(request): """ Endpoint for performance metrics, aggregates view performance stats and converts them to general metric row """ application = request.context.resource if request.method.upper() == "OPTIONS": return check_cors(request, application) else: check_cors(request, application, should_return=False) params = dict(request.params.copy()) proto_version = parse_proto(params.get("protocol_version", "")) payload = request.unsafe_json_body schema = MetricsListSchema() dataset = schema.deserialize(payload) rate_limiting( request, application, "per_application_metrics_rate_limit", len(dataset) ) # looping report data metrics = {} for metric in dataset: server_name = metric.get("server", "").lower() or "unknown" start_interval = convert_date(metric["timestamp"]) start_interval = start_interval.replace(second=0, microsecond=0) for view_name, view_metrics in metric["metrics"]: key = "%s%s%s" % (metric["server"], start_interval, view_name) if start_interval not in metrics: metrics[key] = { "requests": 0, "main": 0, "sql": 0, "nosql": 0, "remote": 0, "tmpl": 0, "custom": 0, "sql_calls": 0, "nosql_calls": 0, "remote_calls": 0, "tmpl_calls": 0, "custom_calls": 0, "start_interval": start_interval, "server_name": server_name, "view_name": view_name, } metrics[key]["requests"] += int(view_metrics["requests"]) metrics[key]["main"] += round(view_metrics["main"], 5) metrics[key]["sql"] += round(view_metrics["sql"], 5) metrics[key]["nosql"] += round(view_metrics["nosql"], 5) metrics[key]["remote"] += round(view_metrics["remote"], 5) metrics[key]["tmpl"] += round(view_metrics["tmpl"], 5) metrics[key]["custom"] += round(view_metrics.get("custom", 0.0), 5) metrics[key]["sql_calls"] += int(view_metrics.get("sql_calls", 0)) metrics[key]["nosql_calls"] += int(view_metrics.get("nosql_calls", 0)) metrics[key]["remote_calls"] += int(view_metrics.get("remote_calls", 0)) metrics[key]["tmpl_calls"] += int(view_metrics.get("tmpl_calls", 0)) metrics[key]["custom_calls"] += int(view_metrics.get("custom_calls", 0)) if not metrics[key]["requests"]: # fix this here because validator can't metrics[key]["requests"] = 1 # metrics dict is being built to minimize # the amount of queries used # in case we get multiple rows from same minute normalized_metrics = [] for metric in metrics.values(): new_metric = { "namespace": "appenlight.request_metric", "timestamp": metric.pop("start_interval"), "server_name": metric["server_name"], "tags": list(metric.items()), } normalized_metrics.append(new_metric) tasks.add_metrics.delay( application.resource_id, params, normalized_metrics, proto_version ) log.info( "REQUEST METRICS call {} {} client:{}".format( application.resource_name, proto_version, request.headers.get("user_agent") ) ) return "OK: request metrics accepted" @view_config( route_name="api_general_metrics", renderer="string", permission="create", require_csrf=False, ) @view_config( route_name="api_general_metric", renderer="string", permission="create", require_csrf=False, ) def general_metrics_create(request): """ Endpoint for general metrics aggregation """ application = request.context.resource if request.method.upper() == "OPTIONS": return check_cors(request, application) else: check_cors(request, application, should_return=False) params = dict(request.params.copy()) proto_version = parse_proto(params.get("protocol_version", "")) payload = request.unsafe_json_body sequence_accepted = request.matched_route.name == "api_general_metrics" if sequence_accepted: if application.allow_permanent_storage: schema = GeneralMetricsPermanentListSchema().bind( utcnow=datetime.datetime.utcnow() ) else: schema = GeneralMetricsListSchema().bind(utcnow=datetime.datetime.utcnow()) else: if application.allow_permanent_storage: schema = GeneralMetricPermanentSchema().bind( utcnow=datetime.datetime.utcnow() ) else: schema = GeneralMetricSchema().bind(utcnow=datetime.datetime.utcnow()) deserialized_metrics = schema.deserialize(payload) if sequence_accepted is False: deserialized_metrics = [deserialized_metrics] rate_limiting( request, application, "per_application_metrics_rate_limit", len(deserialized_metrics), ) tasks.add_metrics.delay( application.resource_id, params, deserialized_metrics, proto_version ) log.info( "METRICS call {} {} client:{}".format( application.resource_name, proto_version, request.headers.get("user_agent") ) ) return "OK: Metrics accepted" @view_config( route_name="api_reports", renderer="string", permission="create", require_csrf=False ) @view_config( route_name="api_slow_reports", renderer="string", permission="create", require_csrf=False, ) @view_config( route_name="api_report", renderer="string", permission="create", require_csrf=False ) def reports_create(request): """ Endpoint for exception and slowness reports """ # route_url('reports') application = request.context.resource if request.method.upper() == "OPTIONS": return check_cors(request, application) else: check_cors(request, application, should_return=False) params = dict(request.params.copy()) proto_version = parse_proto(params.get("protocol_version", "")) payload = request.unsafe_json_body sequence_accepted = request.matched_route.name == "api_reports" if sequence_accepted: schema = ReportListSchema_0_5().bind(utcnow=datetime.datetime.utcnow()) else: schema = ReportSchema_0_5().bind(utcnow=datetime.datetime.utcnow()) deserialized_reports = schema.deserialize(payload) if sequence_accepted is False: deserialized_reports = [deserialized_reports] if deserialized_reports: rate_limiting( request, application, "per_application_reports_rate_limit", len(deserialized_reports), ) # pprint.pprint(deserialized_reports) tasks.add_reports.delay(application.resource_id, params, deserialized_reports) log.info( "REPORT call %s, %s client:%s" % (application, proto_version, request.headers.get("user_agent")) ) return "OK: Reports accepted" @view_config( route_name="api_airbrake", renderer="string", permission="create", require_csrf=False, ) def airbrake_xml_compat(request): """ Airbrake compatible endpoint for XML reports """ application = request.context.resource if request.method.upper() == "OPTIONS": return check_cors(request, application) else: check_cors(request, application, should_return=False) params = dict(request.params.copy()) error_dict = parse_airbrake_xml(request) schema = ReportListSchema_0_5().bind(utcnow=datetime.datetime.utcnow()) deserialized_reports = schema.deserialize([error_dict]) rate_limiting( request, application, "per_application_reports_rate_limit", len(deserialized_reports), ) tasks.add_reports.delay(application.resource_id, params, deserialized_reports) log.info( "%s AIRBRAKE call for application %s, api_ver:%s client:%s" % ( 500, application.resource_name, request.params.get("protocol_version", "unknown"), request.headers.get("user_agent"), ) ) return ( "no-id%s" % request.registry.settings["mailing.app_url"] ) def decompress_gzip(data): try: fp = io.StringIO(data) with GzipFile(fileobj=fp) as f: return f.read() except Exception as exc: raise log.error(exc) raise HTTPBadRequest() def decompress_zlib(data): try: return zlib.decompress(data) except Exception as exc: raise log.error(exc) raise HTTPBadRequest() def decode_b64(data): try: return base64.b64decode(data) except Exception as exc: raise log.error(exc) raise HTTPBadRequest() @view_config( route_name="api_sentry", renderer="string", permission="create", require_csrf=False ) @view_config( route_name="api_sentry_slash", renderer="string", permission="create", require_csrf=False, ) def sentry_compat(request): """ Sentry compatible endpoint """ application = request.context.resource if request.method.upper() == "OPTIONS": return check_cors(request, application) else: check_cors(request, application, should_return=False) # handle various report encoding content_encoding = request.headers.get("Content-Encoding") content_type = request.headers.get("Content-Type") if content_encoding == "gzip": body = decompress_gzip(request.body) elif content_encoding == "deflate": body = decompress_zlib(request.body) else: body = request.body # attempt to fix string before decoding for stupid clients if content_type == "application/x-www-form-urlencoded": body = urllib.parse.unquote(body.decode("utf8")) check_char = "{" if isinstance(body, str) else b"{" if not body.startswith(check_char): try: body = decode_b64(body) body = decompress_zlib(body) except Exception as exc: log.info(exc) try: json_body = json.loads(body.decode("utf8")) except ValueError: raise JSONException("Incorrect JSON") event, event_type = parse_sentry_event(json_body) if event_type == ParsedSentryEventType.LOG: if application.allow_permanent_storage: schema = LogSchemaPermanent().bind(utcnow=datetime.datetime.utcnow()) else: schema = LogSchema().bind(utcnow=datetime.datetime.utcnow()) deserialized_logs = schema.deserialize(event) non_pkey_logs = [deserialized_logs] log.debug("%s non-pkey logs received: %s" % (application, len(non_pkey_logs))) tasks.add_logs.delay(application.resource_id, {}, non_pkey_logs) if event_type == ParsedSentryEventType.ERROR_REPORT: schema = ReportSchema_0_5().bind( utcnow=datetime.datetime.utcnow(), allow_permanent_storage=application.allow_permanent_storage, ) deserialized_reports = [schema.deserialize(event)] rate_limiting( request, application, "per_application_reports_rate_limit", len(deserialized_reports), ) tasks.add_reports.delay(application.resource_id, {}, deserialized_reports) return "OK: Events accepted"