|
|
# -*- coding: utf-8 -*-
|
|
|
|
|
|
# Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors
|
|
|
#
|
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
# you may not use this file except in compliance with the License.
|
|
|
# You may obtain a copy of the License at
|
|
|
#
|
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
|
#
|
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
# See the License for the specific language governing permissions and
|
|
|
# limitations under the License.
|
|
|
|
|
|
import base64
|
|
|
import io
|
|
|
import datetime
|
|
|
import json
|
|
|
import logging
|
|
|
import urllib.request, urllib.parse, urllib.error
|
|
|
import zlib
|
|
|
|
|
|
from gzip import GzipFile
|
|
|
from pyramid.view import view_config
|
|
|
from pyramid.httpexceptions import HTTPBadRequest
|
|
|
|
|
|
import appenlight.celery.tasks as tasks
|
|
|
from appenlight.lib.api import rate_limiting, check_cors
|
|
|
from appenlight.lib.enums import ParsedSentryEventType
|
|
|
from appenlight.lib.utils import parse_proto
|
|
|
from appenlight.lib.utils.airbrake import parse_airbrake_xml
|
|
|
from appenlight.lib.utils.date_utils import convert_date
|
|
|
from appenlight.lib.utils.sentry import parse_sentry_event
|
|
|
from appenlight.lib.request import JSONException
|
|
|
from appenlight.validators import (
|
|
|
LogListSchema,
|
|
|
MetricsListSchema,
|
|
|
GeneralMetricsListSchema,
|
|
|
GeneralMetricsPermanentListSchema,
|
|
|
GeneralMetricSchema,
|
|
|
GeneralMetricPermanentSchema,
|
|
|
LogListPermanentSchema,
|
|
|
ReportListSchema_0_5,
|
|
|
LogSchema,
|
|
|
LogSchemaPermanent,
|
|
|
ReportSchema_0_5,
|
|
|
)
|
|
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
@view_config(
|
|
|
route_name="api_logs", renderer="string", permission="create", require_csrf=False
|
|
|
)
|
|
|
@view_config(
|
|
|
route_name="api_log", renderer="string", permission="create", require_csrf=False
|
|
|
)
|
|
|
def logs_create(request):
|
|
|
"""
|
|
|
Endpoint for log aggregation
|
|
|
"""
|
|
|
application = request.context.resource
|
|
|
if request.method.upper() == "OPTIONS":
|
|
|
return check_cors(request, application)
|
|
|
else:
|
|
|
check_cors(request, application, should_return=False)
|
|
|
|
|
|
params = dict(request.params.copy())
|
|
|
proto_version = parse_proto(params.get("protocol_version", ""))
|
|
|
payload = request.unsafe_json_body
|
|
|
sequence_accepted = request.matched_route.name == "api_logs"
|
|
|
|
|
|
if sequence_accepted:
|
|
|
if application.allow_permanent_storage:
|
|
|
schema = LogListPermanentSchema().bind(utcnow=datetime.datetime.utcnow())
|
|
|
else:
|
|
|
schema = LogListSchema().bind(utcnow=datetime.datetime.utcnow())
|
|
|
else:
|
|
|
if application.allow_permanent_storage:
|
|
|
schema = LogSchemaPermanent().bind(utcnow=datetime.datetime.utcnow())
|
|
|
else:
|
|
|
schema = LogSchema().bind(utcnow=datetime.datetime.utcnow())
|
|
|
|
|
|
deserialized_logs = schema.deserialize(payload)
|
|
|
if sequence_accepted is False:
|
|
|
deserialized_logs = [deserialized_logs]
|
|
|
|
|
|
rate_limiting(
|
|
|
request, application, "per_application_logs_rate_limit", len(deserialized_logs)
|
|
|
)
|
|
|
|
|
|
# pprint.pprint(deserialized_logs)
|
|
|
|
|
|
# we need to split those out so we can process the pkey ones one by one
|
|
|
non_pkey_logs = [
|
|
|
log_dict for log_dict in deserialized_logs if not log_dict["primary_key"]
|
|
|
]
|
|
|
pkey_dict = {}
|
|
|
# try to process the logs as best as we can and group together to reduce
|
|
|
# the amount of
|
|
|
for log_dict in deserialized_logs:
|
|
|
if log_dict["primary_key"]:
|
|
|
key = (log_dict["primary_key"], log_dict["namespace"])
|
|
|
if not key in pkey_dict:
|
|
|
pkey_dict[key] = []
|
|
|
pkey_dict[key].append(log_dict)
|
|
|
|
|
|
if non_pkey_logs:
|
|
|
log.debug("%s non-pkey logs received: %s" % (application, len(non_pkey_logs)))
|
|
|
tasks.add_logs.delay(application.resource_id, params, non_pkey_logs)
|
|
|
if pkey_dict:
|
|
|
logs_to_insert = []
|
|
|
for primary_key_tuple, payload in pkey_dict.items():
|
|
|
sorted_logs = sorted(payload, key=lambda x: x["date"])
|
|
|
logs_to_insert.append(sorted_logs[-1])
|
|
|
log.debug("%s pkey logs received: %s" % (application, len(logs_to_insert)))
|
|
|
tasks.add_logs.delay(application.resource_id, params, logs_to_insert)
|
|
|
|
|
|
log.info(
|
|
|
"LOG call %s %s client:%s"
|
|
|
% (application, proto_version, request.headers.get("user_agent"))
|
|
|
)
|
|
|
return "OK: Logs accepted"
|
|
|
|
|
|
|
|
|
@view_config(
|
|
|
route_name="api_request_stats",
|
|
|
renderer="string",
|
|
|
permission="create",
|
|
|
require_csrf=False,
|
|
|
)
|
|
|
@view_config(
|
|
|
route_name="api_metrics", renderer="string", permission="create", require_csrf=False
|
|
|
)
|
|
|
def request_metrics_create(request):
|
|
|
"""
|
|
|
Endpoint for performance metrics, aggregates view performance stats
|
|
|
and converts them to general metric row
|
|
|
"""
|
|
|
application = request.context.resource
|
|
|
if request.method.upper() == "OPTIONS":
|
|
|
return check_cors(request, application)
|
|
|
else:
|
|
|
check_cors(request, application, should_return=False)
|
|
|
|
|
|
params = dict(request.params.copy())
|
|
|
proto_version = parse_proto(params.get("protocol_version", ""))
|
|
|
|
|
|
payload = request.unsafe_json_body
|
|
|
schema = MetricsListSchema()
|
|
|
dataset = schema.deserialize(payload)
|
|
|
|
|
|
rate_limiting(
|
|
|
request, application, "per_application_metrics_rate_limit", len(dataset)
|
|
|
)
|
|
|
|
|
|
# looping report data
|
|
|
metrics = {}
|
|
|
for metric in dataset:
|
|
|
server_name = metric.get("server", "").lower() or "unknown"
|
|
|
start_interval = convert_date(metric["timestamp"])
|
|
|
start_interval = start_interval.replace(second=0, microsecond=0)
|
|
|
|
|
|
for view_name, view_metrics in metric["metrics"]:
|
|
|
key = "%s%s%s" % (metric["server"], start_interval, view_name)
|
|
|
if start_interval not in metrics:
|
|
|
metrics[key] = {
|
|
|
"requests": 0,
|
|
|
"main": 0,
|
|
|
"sql": 0,
|
|
|
"nosql": 0,
|
|
|
"remote": 0,
|
|
|
"tmpl": 0,
|
|
|
"custom": 0,
|
|
|
"sql_calls": 0,
|
|
|
"nosql_calls": 0,
|
|
|
"remote_calls": 0,
|
|
|
"tmpl_calls": 0,
|
|
|
"custom_calls": 0,
|
|
|
"start_interval": start_interval,
|
|
|
"server_name": server_name,
|
|
|
"view_name": view_name,
|
|
|
}
|
|
|
metrics[key]["requests"] += int(view_metrics["requests"])
|
|
|
metrics[key]["main"] += round(view_metrics["main"], 5)
|
|
|
metrics[key]["sql"] += round(view_metrics["sql"], 5)
|
|
|
metrics[key]["nosql"] += round(view_metrics["nosql"], 5)
|
|
|
metrics[key]["remote"] += round(view_metrics["remote"], 5)
|
|
|
metrics[key]["tmpl"] += round(view_metrics["tmpl"], 5)
|
|
|
metrics[key]["custom"] += round(view_metrics.get("custom", 0.0), 5)
|
|
|
metrics[key]["sql_calls"] += int(view_metrics.get("sql_calls", 0))
|
|
|
metrics[key]["nosql_calls"] += int(view_metrics.get("nosql_calls", 0))
|
|
|
metrics[key]["remote_calls"] += int(view_metrics.get("remote_calls", 0))
|
|
|
metrics[key]["tmpl_calls"] += int(view_metrics.get("tmpl_calls", 0))
|
|
|
metrics[key]["custom_calls"] += int(view_metrics.get("custom_calls", 0))
|
|
|
|
|
|
if not metrics[key]["requests"]:
|
|
|
# fix this here because validator can't
|
|
|
metrics[key]["requests"] = 1
|
|
|
# metrics dict is being built to minimize
|
|
|
# the amount of queries used
|
|
|
# in case we get multiple rows from same minute
|
|
|
|
|
|
normalized_metrics = []
|
|
|
for metric in metrics.values():
|
|
|
new_metric = {
|
|
|
"namespace": "appenlight.request_metric",
|
|
|
"timestamp": metric.pop("start_interval"),
|
|
|
"server_name": metric["server_name"],
|
|
|
"tags": list(metric.items()),
|
|
|
}
|
|
|
normalized_metrics.append(new_metric)
|
|
|
|
|
|
tasks.add_metrics.delay(
|
|
|
application.resource_id, params, normalized_metrics, proto_version
|
|
|
)
|
|
|
|
|
|
log.info(
|
|
|
"REQUEST METRICS call {} {} client:{}".format(
|
|
|
application.resource_name, proto_version, request.headers.get("user_agent")
|
|
|
)
|
|
|
)
|
|
|
return "OK: request metrics accepted"
|
|
|
|
|
|
|
|
|
@view_config(
|
|
|
route_name="api_general_metrics",
|
|
|
renderer="string",
|
|
|
permission="create",
|
|
|
require_csrf=False,
|
|
|
)
|
|
|
@view_config(
|
|
|
route_name="api_general_metric",
|
|
|
renderer="string",
|
|
|
permission="create",
|
|
|
require_csrf=False,
|
|
|
)
|
|
|
def general_metrics_create(request):
|
|
|
"""
|
|
|
Endpoint for general metrics aggregation
|
|
|
"""
|
|
|
application = request.context.resource
|
|
|
if request.method.upper() == "OPTIONS":
|
|
|
return check_cors(request, application)
|
|
|
else:
|
|
|
check_cors(request, application, should_return=False)
|
|
|
|
|
|
params = dict(request.params.copy())
|
|
|
proto_version = parse_proto(params.get("protocol_version", ""))
|
|
|
payload = request.unsafe_json_body
|
|
|
sequence_accepted = request.matched_route.name == "api_general_metrics"
|
|
|
if sequence_accepted:
|
|
|
if application.allow_permanent_storage:
|
|
|
schema = GeneralMetricsPermanentListSchema().bind(
|
|
|
utcnow=datetime.datetime.utcnow()
|
|
|
)
|
|
|
else:
|
|
|
schema = GeneralMetricsListSchema().bind(utcnow=datetime.datetime.utcnow())
|
|
|
else:
|
|
|
if application.allow_permanent_storage:
|
|
|
schema = GeneralMetricPermanentSchema().bind(
|
|
|
utcnow=datetime.datetime.utcnow()
|
|
|
)
|
|
|
else:
|
|
|
schema = GeneralMetricSchema().bind(utcnow=datetime.datetime.utcnow())
|
|
|
|
|
|
deserialized_metrics = schema.deserialize(payload)
|
|
|
if sequence_accepted is False:
|
|
|
deserialized_metrics = [deserialized_metrics]
|
|
|
|
|
|
rate_limiting(
|
|
|
request,
|
|
|
application,
|
|
|
"per_application_metrics_rate_limit",
|
|
|
len(deserialized_metrics),
|
|
|
)
|
|
|
|
|
|
tasks.add_metrics.delay(
|
|
|
application.resource_id, params, deserialized_metrics, proto_version
|
|
|
)
|
|
|
|
|
|
log.info(
|
|
|
"METRICS call {} {} client:{}".format(
|
|
|
application.resource_name, proto_version, request.headers.get("user_agent")
|
|
|
)
|
|
|
)
|
|
|
return "OK: Metrics accepted"
|
|
|
|
|
|
|
|
|
@view_config(
|
|
|
route_name="api_reports", renderer="string", permission="create", require_csrf=False
|
|
|
)
|
|
|
@view_config(
|
|
|
route_name="api_slow_reports",
|
|
|
renderer="string",
|
|
|
permission="create",
|
|
|
require_csrf=False,
|
|
|
)
|
|
|
@view_config(
|
|
|
route_name="api_report", renderer="string", permission="create", require_csrf=False
|
|
|
)
|
|
|
def reports_create(request):
|
|
|
"""
|
|
|
Endpoint for exception and slowness reports
|
|
|
"""
|
|
|
# route_url('reports')
|
|
|
application = request.context.resource
|
|
|
if request.method.upper() == "OPTIONS":
|
|
|
return check_cors(request, application)
|
|
|
else:
|
|
|
check_cors(request, application, should_return=False)
|
|
|
params = dict(request.params.copy())
|
|
|
proto_version = parse_proto(params.get("protocol_version", ""))
|
|
|
payload = request.unsafe_json_body
|
|
|
sequence_accepted = request.matched_route.name == "api_reports"
|
|
|
|
|
|
if sequence_accepted:
|
|
|
schema = ReportListSchema_0_5().bind(utcnow=datetime.datetime.utcnow())
|
|
|
else:
|
|
|
schema = ReportSchema_0_5().bind(utcnow=datetime.datetime.utcnow())
|
|
|
|
|
|
deserialized_reports = schema.deserialize(payload)
|
|
|
if sequence_accepted is False:
|
|
|
deserialized_reports = [deserialized_reports]
|
|
|
if deserialized_reports:
|
|
|
rate_limiting(
|
|
|
request,
|
|
|
application,
|
|
|
"per_application_reports_rate_limit",
|
|
|
len(deserialized_reports),
|
|
|
)
|
|
|
|
|
|
# pprint.pprint(deserialized_reports)
|
|
|
tasks.add_reports.delay(application.resource_id, params, deserialized_reports)
|
|
|
log.info(
|
|
|
"REPORT call %s, %s client:%s"
|
|
|
% (application, proto_version, request.headers.get("user_agent"))
|
|
|
)
|
|
|
return "OK: Reports accepted"
|
|
|
|
|
|
|
|
|
@view_config(
|
|
|
route_name="api_airbrake",
|
|
|
renderer="string",
|
|
|
permission="create",
|
|
|
require_csrf=False,
|
|
|
)
|
|
|
def airbrake_xml_compat(request):
|
|
|
"""
|
|
|
Airbrake compatible endpoint for XML reports
|
|
|
"""
|
|
|
application = request.context.resource
|
|
|
if request.method.upper() == "OPTIONS":
|
|
|
return check_cors(request, application)
|
|
|
else:
|
|
|
check_cors(request, application, should_return=False)
|
|
|
|
|
|
params = dict(request.params.copy())
|
|
|
|
|
|
error_dict = parse_airbrake_xml(request)
|
|
|
schema = ReportListSchema_0_5().bind(utcnow=datetime.datetime.utcnow())
|
|
|
deserialized_reports = schema.deserialize([error_dict])
|
|
|
rate_limiting(
|
|
|
request,
|
|
|
application,
|
|
|
"per_application_reports_rate_limit",
|
|
|
len(deserialized_reports),
|
|
|
)
|
|
|
|
|
|
tasks.add_reports.delay(application.resource_id, params, deserialized_reports)
|
|
|
log.info(
|
|
|
"%s AIRBRAKE call for application %s, api_ver:%s client:%s"
|
|
|
% (
|
|
|
500,
|
|
|
application.resource_name,
|
|
|
request.params.get("protocol_version", "unknown"),
|
|
|
request.headers.get("user_agent"),
|
|
|
)
|
|
|
)
|
|
|
return (
|
|
|
"<notice><id>no-id</id><url>%s</url></notice>"
|
|
|
% request.registry.settings["mailing.app_url"]
|
|
|
)
|
|
|
|
|
|
|
|
|
def decompress_gzip(data):
|
|
|
try:
|
|
|
fp = io.StringIO(data)
|
|
|
with GzipFile(fileobj=fp) as f:
|
|
|
return f.read()
|
|
|
except Exception as exc:
|
|
|
raise
|
|
|
log.error(exc)
|
|
|
raise HTTPBadRequest()
|
|
|
|
|
|
|
|
|
def decompress_zlib(data):
|
|
|
try:
|
|
|
return zlib.decompress(data)
|
|
|
except Exception as exc:
|
|
|
raise
|
|
|
log.error(exc)
|
|
|
raise HTTPBadRequest()
|
|
|
|
|
|
|
|
|
def decode_b64(data):
|
|
|
try:
|
|
|
return base64.b64decode(data)
|
|
|
except Exception as exc:
|
|
|
raise
|
|
|
log.error(exc)
|
|
|
raise HTTPBadRequest()
|
|
|
|
|
|
|
|
|
@view_config(
|
|
|
route_name="api_sentry", renderer="string", permission="create", require_csrf=False
|
|
|
)
|
|
|
@view_config(
|
|
|
route_name="api_sentry_slash",
|
|
|
renderer="string",
|
|
|
permission="create",
|
|
|
require_csrf=False,
|
|
|
)
|
|
|
def sentry_compat(request):
|
|
|
"""
|
|
|
Sentry compatible endpoint
|
|
|
"""
|
|
|
application = request.context.resource
|
|
|
if request.method.upper() == "OPTIONS":
|
|
|
return check_cors(request, application)
|
|
|
else:
|
|
|
check_cors(request, application, should_return=False)
|
|
|
|
|
|
# handle various report encoding
|
|
|
content_encoding = request.headers.get("Content-Encoding")
|
|
|
content_type = request.headers.get("Content-Type")
|
|
|
if content_encoding == "gzip":
|
|
|
body = decompress_gzip(request.body)
|
|
|
elif content_encoding == "deflate":
|
|
|
body = decompress_zlib(request.body)
|
|
|
else:
|
|
|
body = request.body
|
|
|
# attempt to fix string before decoding for stupid clients
|
|
|
if content_type == "application/x-www-form-urlencoded":
|
|
|
body = urllib.parse.unquote(body.decode("utf8"))
|
|
|
check_char = "{" if isinstance(body, str) else b"{"
|
|
|
if not body.startswith(check_char):
|
|
|
try:
|
|
|
body = decode_b64(body)
|
|
|
body = decompress_zlib(body)
|
|
|
except Exception as exc:
|
|
|
log.info(exc)
|
|
|
|
|
|
try:
|
|
|
json_body = json.loads(body.decode("utf8"))
|
|
|
except ValueError:
|
|
|
raise JSONException("Incorrect JSON")
|
|
|
|
|
|
event, event_type = parse_sentry_event(json_body)
|
|
|
|
|
|
if event_type == ParsedSentryEventType.LOG:
|
|
|
if application.allow_permanent_storage:
|
|
|
schema = LogSchemaPermanent().bind(utcnow=datetime.datetime.utcnow())
|
|
|
else:
|
|
|
schema = LogSchema().bind(utcnow=datetime.datetime.utcnow())
|
|
|
deserialized_logs = schema.deserialize(event)
|
|
|
non_pkey_logs = [deserialized_logs]
|
|
|
log.debug("%s non-pkey logs received: %s" % (application, len(non_pkey_logs)))
|
|
|
tasks.add_logs.delay(application.resource_id, {}, non_pkey_logs)
|
|
|
if event_type == ParsedSentryEventType.ERROR_REPORT:
|
|
|
schema = ReportSchema_0_5().bind(
|
|
|
utcnow=datetime.datetime.utcnow(),
|
|
|
allow_permanent_storage=application.allow_permanent_storage,
|
|
|
)
|
|
|
deserialized_reports = [schema.deserialize(event)]
|
|
|
rate_limiting(
|
|
|
request,
|
|
|
application,
|
|
|
"per_application_reports_rate_limit",
|
|
|
len(deserialized_reports),
|
|
|
)
|
|
|
tasks.add_reports.delay(application.resource_id, {}, deserialized_reports)
|
|
|
return "OK: Events accepted"
|
|
|
|