##// END OF EJS Templates
setup: change url to github
setup: change url to github

File last commit:

r153:32f4b641
r196:472d1df0 master
Show More
api.py
485 lines | 16.1 KiB | text/x-python | PythonLexer
# -*- coding: utf-8 -*-
# Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import base64
import io
import datetime
import json
import logging
import urllib.request, urllib.parse, urllib.error
import zlib
from gzip import GzipFile
from pyramid.view import view_config
from pyramid.httpexceptions import HTTPBadRequest
import appenlight.celery.tasks as tasks
from appenlight.lib.api import rate_limiting, check_cors
from appenlight.lib.enums import ParsedSentryEventType
from appenlight.lib.utils import parse_proto
from appenlight.lib.utils.airbrake import parse_airbrake_xml
from appenlight.lib.utils.date_utils import convert_date
from appenlight.lib.utils.sentry import parse_sentry_event
from appenlight.lib.request import JSONException
from appenlight.validators import (
LogListSchema,
MetricsListSchema,
GeneralMetricsListSchema,
GeneralMetricsPermanentListSchema,
GeneralMetricSchema,
GeneralMetricPermanentSchema,
LogListPermanentSchema,
ReportListSchema_0_5,
LogSchema,
LogSchemaPermanent,
ReportSchema_0_5,
)
log = logging.getLogger(__name__)
@view_config(
route_name="api_logs", renderer="string", permission="create", require_csrf=False
)
@view_config(
route_name="api_log", renderer="string", permission="create", require_csrf=False
)
def logs_create(request):
"""
Endpoint for log aggregation
"""
application = request.context.resource
if request.method.upper() == "OPTIONS":
return check_cors(request, application)
else:
check_cors(request, application, should_return=False)
params = dict(request.params.copy())
proto_version = parse_proto(params.get("protocol_version", ""))
payload = request.unsafe_json_body
sequence_accepted = request.matched_route.name == "api_logs"
if sequence_accepted:
if application.allow_permanent_storage:
schema = LogListPermanentSchema().bind(utcnow=datetime.datetime.utcnow())
else:
schema = LogListSchema().bind(utcnow=datetime.datetime.utcnow())
else:
if application.allow_permanent_storage:
schema = LogSchemaPermanent().bind(utcnow=datetime.datetime.utcnow())
else:
schema = LogSchema().bind(utcnow=datetime.datetime.utcnow())
deserialized_logs = schema.deserialize(payload)
if sequence_accepted is False:
deserialized_logs = [deserialized_logs]
rate_limiting(
request, application, "per_application_logs_rate_limit", len(deserialized_logs)
)
# pprint.pprint(deserialized_logs)
# we need to split those out so we can process the pkey ones one by one
non_pkey_logs = [
log_dict for log_dict in deserialized_logs if not log_dict["primary_key"]
]
pkey_dict = {}
# try to process the logs as best as we can and group together to reduce
# the amount of
for log_dict in deserialized_logs:
if log_dict["primary_key"]:
key = (log_dict["primary_key"], log_dict["namespace"])
if not key in pkey_dict:
pkey_dict[key] = []
pkey_dict[key].append(log_dict)
if non_pkey_logs:
log.debug("%s non-pkey logs received: %s" % (application, len(non_pkey_logs)))
tasks.add_logs.delay(application.resource_id, params, non_pkey_logs)
if pkey_dict:
logs_to_insert = []
for primary_key_tuple, payload in pkey_dict.items():
sorted_logs = sorted(payload, key=lambda x: x["date"])
logs_to_insert.append(sorted_logs[-1])
log.debug("%s pkey logs received: %s" % (application, len(logs_to_insert)))
tasks.add_logs.delay(application.resource_id, params, logs_to_insert)
log.info(
"LOG call %s %s client:%s"
% (application, proto_version, request.headers.get("user_agent"))
)
return "OK: Logs accepted"
@view_config(
route_name="api_request_stats",
renderer="string",
permission="create",
require_csrf=False,
)
@view_config(
route_name="api_metrics", renderer="string", permission="create", require_csrf=False
)
def request_metrics_create(request):
"""
Endpoint for performance metrics, aggregates view performance stats
and converts them to general metric row
"""
application = request.context.resource
if request.method.upper() == "OPTIONS":
return check_cors(request, application)
else:
check_cors(request, application, should_return=False)
params = dict(request.params.copy())
proto_version = parse_proto(params.get("protocol_version", ""))
payload = request.unsafe_json_body
schema = MetricsListSchema()
dataset = schema.deserialize(payload)
rate_limiting(
request, application, "per_application_metrics_rate_limit", len(dataset)
)
# looping report data
metrics = {}
for metric in dataset:
server_name = metric.get("server", "").lower() or "unknown"
start_interval = convert_date(metric["timestamp"])
start_interval = start_interval.replace(second=0, microsecond=0)
for view_name, view_metrics in metric["metrics"]:
key = "%s%s%s" % (metric["server"], start_interval, view_name)
if start_interval not in metrics:
metrics[key] = {
"requests": 0,
"main": 0,
"sql": 0,
"nosql": 0,
"remote": 0,
"tmpl": 0,
"custom": 0,
"sql_calls": 0,
"nosql_calls": 0,
"remote_calls": 0,
"tmpl_calls": 0,
"custom_calls": 0,
"start_interval": start_interval,
"server_name": server_name,
"view_name": view_name,
}
metrics[key]["requests"] += int(view_metrics["requests"])
metrics[key]["main"] += round(view_metrics["main"], 5)
metrics[key]["sql"] += round(view_metrics["sql"], 5)
metrics[key]["nosql"] += round(view_metrics["nosql"], 5)
metrics[key]["remote"] += round(view_metrics["remote"], 5)
metrics[key]["tmpl"] += round(view_metrics["tmpl"], 5)
metrics[key]["custom"] += round(view_metrics.get("custom", 0.0), 5)
metrics[key]["sql_calls"] += int(view_metrics.get("sql_calls", 0))
metrics[key]["nosql_calls"] += int(view_metrics.get("nosql_calls", 0))
metrics[key]["remote_calls"] += int(view_metrics.get("remote_calls", 0))
metrics[key]["tmpl_calls"] += int(view_metrics.get("tmpl_calls", 0))
metrics[key]["custom_calls"] += int(view_metrics.get("custom_calls", 0))
if not metrics[key]["requests"]:
# fix this here because validator can't
metrics[key]["requests"] = 1
# metrics dict is being built to minimize
# the amount of queries used
# in case we get multiple rows from same minute
normalized_metrics = []
for metric in metrics.values():
new_metric = {
"namespace": "appenlight.request_metric",
"timestamp": metric.pop("start_interval"),
"server_name": metric["server_name"],
"tags": list(metric.items()),
}
normalized_metrics.append(new_metric)
tasks.add_metrics.delay(
application.resource_id, params, normalized_metrics, proto_version
)
log.info(
"REQUEST METRICS call {} {} client:{}".format(
application.resource_name, proto_version, request.headers.get("user_agent")
)
)
return "OK: request metrics accepted"
@view_config(
route_name="api_general_metrics",
renderer="string",
permission="create",
require_csrf=False,
)
@view_config(
route_name="api_general_metric",
renderer="string",
permission="create",
require_csrf=False,
)
def general_metrics_create(request):
"""
Endpoint for general metrics aggregation
"""
application = request.context.resource
if request.method.upper() == "OPTIONS":
return check_cors(request, application)
else:
check_cors(request, application, should_return=False)
params = dict(request.params.copy())
proto_version = parse_proto(params.get("protocol_version", ""))
payload = request.unsafe_json_body
sequence_accepted = request.matched_route.name == "api_general_metrics"
if sequence_accepted:
if application.allow_permanent_storage:
schema = GeneralMetricsPermanentListSchema().bind(
utcnow=datetime.datetime.utcnow()
)
else:
schema = GeneralMetricsListSchema().bind(utcnow=datetime.datetime.utcnow())
else:
if application.allow_permanent_storage:
schema = GeneralMetricPermanentSchema().bind(
utcnow=datetime.datetime.utcnow()
)
else:
schema = GeneralMetricSchema().bind(utcnow=datetime.datetime.utcnow())
deserialized_metrics = schema.deserialize(payload)
if sequence_accepted is False:
deserialized_metrics = [deserialized_metrics]
rate_limiting(
request,
application,
"per_application_metrics_rate_limit",
len(deserialized_metrics),
)
tasks.add_metrics.delay(
application.resource_id, params, deserialized_metrics, proto_version
)
log.info(
"METRICS call {} {} client:{}".format(
application.resource_name, proto_version, request.headers.get("user_agent")
)
)
return "OK: Metrics accepted"
@view_config(
route_name="api_reports", renderer="string", permission="create", require_csrf=False
)
@view_config(
route_name="api_slow_reports",
renderer="string",
permission="create",
require_csrf=False,
)
@view_config(
route_name="api_report", renderer="string", permission="create", require_csrf=False
)
def reports_create(request):
"""
Endpoint for exception and slowness reports
"""
# route_url('reports')
application = request.context.resource
if request.method.upper() == "OPTIONS":
return check_cors(request, application)
else:
check_cors(request, application, should_return=False)
params = dict(request.params.copy())
proto_version = parse_proto(params.get("protocol_version", ""))
payload = request.unsafe_json_body
sequence_accepted = request.matched_route.name == "api_reports"
if sequence_accepted:
schema = ReportListSchema_0_5().bind(utcnow=datetime.datetime.utcnow())
else:
schema = ReportSchema_0_5().bind(utcnow=datetime.datetime.utcnow())
deserialized_reports = schema.deserialize(payload)
if sequence_accepted is False:
deserialized_reports = [deserialized_reports]
if deserialized_reports:
rate_limiting(
request,
application,
"per_application_reports_rate_limit",
len(deserialized_reports),
)
# pprint.pprint(deserialized_reports)
tasks.add_reports.delay(application.resource_id, params, deserialized_reports)
log.info(
"REPORT call %s, %s client:%s"
% (application, proto_version, request.headers.get("user_agent"))
)
return "OK: Reports accepted"
@view_config(
route_name="api_airbrake",
renderer="string",
permission="create",
require_csrf=False,
)
def airbrake_xml_compat(request):
"""
Airbrake compatible endpoint for XML reports
"""
application = request.context.resource
if request.method.upper() == "OPTIONS":
return check_cors(request, application)
else:
check_cors(request, application, should_return=False)
params = dict(request.params.copy())
error_dict = parse_airbrake_xml(request)
schema = ReportListSchema_0_5().bind(utcnow=datetime.datetime.utcnow())
deserialized_reports = schema.deserialize([error_dict])
rate_limiting(
request,
application,
"per_application_reports_rate_limit",
len(deserialized_reports),
)
tasks.add_reports.delay(application.resource_id, params, deserialized_reports)
log.info(
"%s AIRBRAKE call for application %s, api_ver:%s client:%s"
% (
500,
application.resource_name,
request.params.get("protocol_version", "unknown"),
request.headers.get("user_agent"),
)
)
return (
"<notice><id>no-id</id><url>%s</url></notice>"
% request.registry.settings["mailing.app_url"]
)
def decompress_gzip(data):
try:
fp = io.StringIO(data)
with GzipFile(fileobj=fp) as f:
return f.read()
except Exception as exc:
raise
log.error(exc)
raise HTTPBadRequest()
def decompress_zlib(data):
try:
return zlib.decompress(data)
except Exception as exc:
raise
log.error(exc)
raise HTTPBadRequest()
def decode_b64(data):
try:
return base64.b64decode(data)
except Exception as exc:
raise
log.error(exc)
raise HTTPBadRequest()
@view_config(
route_name="api_sentry", renderer="string", permission="create", require_csrf=False
)
@view_config(
route_name="api_sentry_slash",
renderer="string",
permission="create",
require_csrf=False,
)
def sentry_compat(request):
"""
Sentry compatible endpoint
"""
application = request.context.resource
if request.method.upper() == "OPTIONS":
return check_cors(request, application)
else:
check_cors(request, application, should_return=False)
# handle various report encoding
content_encoding = request.headers.get("Content-Encoding")
content_type = request.headers.get("Content-Type")
if content_encoding == "gzip":
body = decompress_gzip(request.body)
elif content_encoding == "deflate":
body = decompress_zlib(request.body)
else:
body = request.body
# attempt to fix string before decoding for stupid clients
if content_type == "application/x-www-form-urlencoded":
body = urllib.parse.unquote(body.decode("utf8"))
check_char = "{" if isinstance(body, str) else b"{"
if not body.startswith(check_char):
try:
body = decode_b64(body)
body = decompress_zlib(body)
except Exception as exc:
log.info(exc)
try:
json_body = json.loads(body.decode("utf8"))
except ValueError:
raise JSONException("Incorrect JSON")
event, event_type = parse_sentry_event(json_body)
if event_type == ParsedSentryEventType.LOG:
if application.allow_permanent_storage:
schema = LogSchemaPermanent().bind(utcnow=datetime.datetime.utcnow())
else:
schema = LogSchema().bind(utcnow=datetime.datetime.utcnow())
deserialized_logs = schema.deserialize(event)
non_pkey_logs = [deserialized_logs]
log.debug("%s non-pkey logs received: %s" % (application, len(non_pkey_logs)))
tasks.add_logs.delay(application.resource_id, {}, non_pkey_logs)
if event_type == ParsedSentryEventType.ERROR_REPORT:
schema = ReportSchema_0_5().bind(
utcnow=datetime.datetime.utcnow(),
allow_permanent_storage=application.allow_permanent_storage,
)
deserialized_reports = [schema.deserialize(event)]
rate_limiting(
request,
application,
"per_application_reports_rate_limit",
len(deserialized_reports),
)
tasks.add_reports.delay(application.resource_id, {}, deserialized_reports)
return "OK: Events accepted"