|
|
# -*- coding: utf-8 -*-
|
|
|
|
|
|
# Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors
|
|
|
#
|
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
# you may not use this file except in compliance with the License.
|
|
|
# You may obtain a copy of the License at
|
|
|
#
|
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
|
#
|
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
# See the License for the specific language governing permissions and
|
|
|
# limitations under the License.
|
|
|
|
|
|
import base64
|
|
|
import io
|
|
|
import datetime
|
|
|
import json
|
|
|
import logging
|
|
|
import urllib.request, urllib.parse, urllib.error
|
|
|
import zlib
|
|
|
|
|
|
from gzip import GzipFile
|
|
|
from pyramid.view import view_config
|
|
|
from pyramid.httpexceptions import HTTPBadRequest
|
|
|
|
|
|
import appenlight.celery.tasks as tasks
|
|
|
from appenlight.lib.api import rate_limiting, check_cors
|
|
|
from appenlight.lib.enums import ParsedSentryEventType
|
|
|
from appenlight.lib.utils import parse_proto
|
|
|
from appenlight.lib.utils.airbrake import parse_airbrake_xml
|
|
|
from appenlight.lib.utils.date_utils import convert_date
|
|
|
from appenlight.lib.utils.sentry import parse_sentry_event
|
|
|
from appenlight.lib.request import JSONException
|
|
|
from appenlight.validators import (LogListSchema,
|
|
|
MetricsListSchema,
|
|
|
GeneralMetricsListSchema,
|
|
|
GeneralMetricsPermanentListSchema,
|
|
|
GeneralMetricSchema,
|
|
|
GeneralMetricPermanentSchema,
|
|
|
LogListPermanentSchema,
|
|
|
ReportListSchema_0_5,
|
|
|
LogSchema,
|
|
|
LogSchemaPermanent,
|
|
|
ReportSchema_0_5)
|
|
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
@view_config(route_name='api_logs', renderer='string', permission='create',
|
|
|
require_csrf=False)
|
|
|
@view_config(route_name='api_log', renderer='string', permission='create',
|
|
|
require_csrf=False)
|
|
|
def logs_create(request):
|
|
|
"""
|
|
|
Endpoint for log aggregation
|
|
|
"""
|
|
|
application = request.context.resource
|
|
|
if request.method.upper() == 'OPTIONS':
|
|
|
return check_cors(request, application)
|
|
|
else:
|
|
|
check_cors(request, application, should_return=False)
|
|
|
|
|
|
params = dict(request.params.copy())
|
|
|
proto_version = parse_proto(params.get('protocol_version', ''))
|
|
|
payload = request.unsafe_json_body
|
|
|
sequence_accepted = request.matched_route.name == 'api_logs'
|
|
|
|
|
|
if sequence_accepted:
|
|
|
if application.allow_permanent_storage:
|
|
|
schema = LogListPermanentSchema().bind(
|
|
|
utcnow=datetime.datetime.utcnow())
|
|
|
else:
|
|
|
schema = LogListSchema().bind(
|
|
|
utcnow=datetime.datetime.utcnow())
|
|
|
else:
|
|
|
if application.allow_permanent_storage:
|
|
|
schema = LogSchemaPermanent().bind(
|
|
|
utcnow=datetime.datetime.utcnow())
|
|
|
else:
|
|
|
schema = LogSchema().bind(
|
|
|
utcnow=datetime.datetime.utcnow())
|
|
|
|
|
|
deserialized_logs = schema.deserialize(payload)
|
|
|
if sequence_accepted is False:
|
|
|
deserialized_logs = [deserialized_logs]
|
|
|
|
|
|
rate_limiting(request, application, 'per_application_logs_rate_limit',
|
|
|
len(deserialized_logs))
|
|
|
|
|
|
# pprint.pprint(deserialized_logs)
|
|
|
|
|
|
# we need to split those out so we can process the pkey ones one by one
|
|
|
non_pkey_logs = [log_dict for log_dict in deserialized_logs
|
|
|
if not log_dict['primary_key']]
|
|
|
pkey_dict = {}
|
|
|
# try to process the logs as best as we can and group together to reduce
|
|
|
# the amount of
|
|
|
for log_dict in deserialized_logs:
|
|
|
if log_dict['primary_key']:
|
|
|
key = (log_dict['primary_key'], log_dict['namespace'],)
|
|
|
if not key in pkey_dict:
|
|
|
pkey_dict[key] = []
|
|
|
pkey_dict[key].append(log_dict)
|
|
|
|
|
|
if non_pkey_logs:
|
|
|
log.debug('%s non-pkey logs received: %s' % (application,
|
|
|
len(non_pkey_logs)))
|
|
|
tasks.add_logs.delay(application.resource_id, params, non_pkey_logs)
|
|
|
if pkey_dict:
|
|
|
logs_to_insert = []
|
|
|
for primary_key_tuple, payload in pkey_dict.items():
|
|
|
sorted_logs = sorted(payload, key=lambda x: x['date'])
|
|
|
logs_to_insert.append(sorted_logs[-1])
|
|
|
log.debug('%s pkey logs received: %s' % (application,
|
|
|
len(logs_to_insert)))
|
|
|
tasks.add_logs.delay(application.resource_id, params, logs_to_insert)
|
|
|
|
|
|
log.info('LOG call %s %s client:%s' % (
|
|
|
application, proto_version, request.headers.get('user_agent')))
|
|
|
return 'OK: Logs accepted'
|
|
|
|
|
|
|
|
|
@view_config(route_name='api_request_stats', renderer='string',
|
|
|
permission='create', require_csrf=False)
|
|
|
@view_config(route_name='api_metrics', renderer='string',
|
|
|
permission='create', require_csrf=False)
|
|
|
def request_metrics_create(request):
|
|
|
"""
|
|
|
Endpoint for performance metrics, aggregates view performance stats
|
|
|
and converts them to general metric row
|
|
|
"""
|
|
|
application = request.context.resource
|
|
|
if request.method.upper() == 'OPTIONS':
|
|
|
return check_cors(request, application)
|
|
|
else:
|
|
|
check_cors(request, application, should_return=False)
|
|
|
|
|
|
params = dict(request.params.copy())
|
|
|
proto_version = parse_proto(params.get('protocol_version', ''))
|
|
|
|
|
|
payload = request.unsafe_json_body
|
|
|
schema = MetricsListSchema()
|
|
|
dataset = schema.deserialize(payload)
|
|
|
|
|
|
rate_limiting(request, application, 'per_application_metrics_rate_limit',
|
|
|
len(dataset))
|
|
|
|
|
|
# looping report data
|
|
|
metrics = {}
|
|
|
for metric in dataset:
|
|
|
server_name = metric.get('server', '').lower() or 'unknown'
|
|
|
start_interval = convert_date(metric['timestamp'])
|
|
|
start_interval = start_interval.replace(second=0, microsecond=0)
|
|
|
|
|
|
for view_name, view_metrics in metric['metrics']:
|
|
|
key = '%s%s%s' % (metric['server'], start_interval, view_name)
|
|
|
if start_interval not in metrics:
|
|
|
metrics[key] = {"requests": 0, "main": 0, "sql": 0,
|
|
|
"nosql": 0, "remote": 0, "tmpl": 0,
|
|
|
"custom": 0, 'sql_calls': 0,
|
|
|
'nosql_calls': 0,
|
|
|
'remote_calls': 0, 'tmpl_calls': 0,
|
|
|
'custom_calls': 0,
|
|
|
"start_interval": start_interval,
|
|
|
"server_name": server_name,
|
|
|
"view_name": view_name
|
|
|
}
|
|
|
metrics[key]["requests"] += int(view_metrics['requests'])
|
|
|
metrics[key]["main"] += round(view_metrics['main'], 5)
|
|
|
metrics[key]["sql"] += round(view_metrics['sql'], 5)
|
|
|
metrics[key]["nosql"] += round(view_metrics['nosql'], 5)
|
|
|
metrics[key]["remote"] += round(view_metrics['remote'], 5)
|
|
|
metrics[key]["tmpl"] += round(view_metrics['tmpl'], 5)
|
|
|
metrics[key]["custom"] += round(view_metrics.get('custom', 0.0),
|
|
|
5)
|
|
|
metrics[key]["sql_calls"] += int(
|
|
|
view_metrics.get('sql_calls', 0))
|
|
|
metrics[key]["nosql_calls"] += int(
|
|
|
view_metrics.get('nosql_calls', 0))
|
|
|
metrics[key]["remote_calls"] += int(
|
|
|
view_metrics.get('remote_calls', 0))
|
|
|
metrics[key]["tmpl_calls"] += int(
|
|
|
view_metrics.get('tmpl_calls', 0))
|
|
|
metrics[key]["custom_calls"] += int(
|
|
|
view_metrics.get('custom_calls', 0))
|
|
|
|
|
|
if not metrics[key]["requests"]:
|
|
|
# fix this here because validator can't
|
|
|
metrics[key]["requests"] = 1
|
|
|
# metrics dict is being built to minimize
|
|
|
# the amount of queries used
|
|
|
# in case we get multiple rows from same minute
|
|
|
|
|
|
normalized_metrics = []
|
|
|
for metric in metrics.values():
|
|
|
new_metric = {
|
|
|
'namespace': 'appenlight.request_metric',
|
|
|
'timestamp': metric.pop('start_interval'),
|
|
|
'server_name': metric['server_name'],
|
|
|
'tags': list(metric.items())
|
|
|
}
|
|
|
normalized_metrics.append(new_metric)
|
|
|
|
|
|
tasks.add_metrics.delay(application.resource_id, params,
|
|
|
normalized_metrics, proto_version)
|
|
|
|
|
|
log.info('REQUEST METRICS call {} {} client:{}'.format(
|
|
|
application.resource_name, proto_version,
|
|
|
request.headers.get('user_agent')))
|
|
|
return 'OK: request metrics accepted'
|
|
|
|
|
|
|
|
|
@view_config(route_name='api_general_metrics', renderer='string',
|
|
|
permission='create', require_csrf=False)
|
|
|
@view_config(route_name='api_general_metric', renderer='string',
|
|
|
permission='create', require_csrf=False)
|
|
|
def general_metrics_create(request):
|
|
|
"""
|
|
|
Endpoint for general metrics aggregation
|
|
|
"""
|
|
|
application = request.context.resource
|
|
|
if request.method.upper() == 'OPTIONS':
|
|
|
return check_cors(request, application)
|
|
|
else:
|
|
|
check_cors(request, application, should_return=False)
|
|
|
|
|
|
params = dict(request.params.copy())
|
|
|
proto_version = parse_proto(params.get('protocol_version', ''))
|
|
|
payload = request.unsafe_json_body
|
|
|
sequence_accepted = request.matched_route.name == 'api_general_metrics'
|
|
|
if sequence_accepted:
|
|
|
if application.allow_permanent_storage:
|
|
|
schema = GeneralMetricsPermanentListSchema().bind(
|
|
|
utcnow=datetime.datetime.utcnow())
|
|
|
else:
|
|
|
schema = GeneralMetricsListSchema().bind(
|
|
|
utcnow=datetime.datetime.utcnow())
|
|
|
else:
|
|
|
if application.allow_permanent_storage:
|
|
|
schema = GeneralMetricPermanentSchema().bind(
|
|
|
utcnow=datetime.datetime.utcnow())
|
|
|
else:
|
|
|
schema = GeneralMetricSchema().bind(
|
|
|
utcnow=datetime.datetime.utcnow())
|
|
|
|
|
|
deserialized_metrics = schema.deserialize(payload)
|
|
|
if sequence_accepted is False:
|
|
|
deserialized_metrics = [deserialized_metrics]
|
|
|
|
|
|
rate_limiting(request, application, 'per_application_metrics_rate_limit',
|
|
|
len(deserialized_metrics))
|
|
|
|
|
|
tasks.add_metrics.delay(application.resource_id, params,
|
|
|
deserialized_metrics, proto_version)
|
|
|
|
|
|
log.info('METRICS call {} {} client:{}'.format(
|
|
|
application.resource_name, proto_version,
|
|
|
request.headers.get('user_agent')))
|
|
|
return 'OK: Metrics accepted'
|
|
|
|
|
|
|
|
|
@view_config(route_name='api_reports', renderer='string', permission='create',
|
|
|
require_csrf=False)
|
|
|
@view_config(route_name='api_slow_reports', renderer='string',
|
|
|
permission='create', require_csrf=False)
|
|
|
@view_config(route_name='api_report', renderer='string', permission='create',
|
|
|
require_csrf=False)
|
|
|
def reports_create(request):
|
|
|
"""
|
|
|
Endpoint for exception and slowness reports
|
|
|
"""
|
|
|
# route_url('reports')
|
|
|
application = request.context.resource
|
|
|
if request.method.upper() == 'OPTIONS':
|
|
|
return check_cors(request, application)
|
|
|
else:
|
|
|
check_cors(request, application, should_return=False)
|
|
|
params = dict(request.params.copy())
|
|
|
proto_version = parse_proto(params.get('protocol_version', ''))
|
|
|
payload = request.unsafe_json_body
|
|
|
sequence_accepted = request.matched_route.name == 'api_reports'
|
|
|
|
|
|
if sequence_accepted:
|
|
|
schema = ReportListSchema_0_5().bind(
|
|
|
utcnow=datetime.datetime.utcnow())
|
|
|
else:
|
|
|
schema = ReportSchema_0_5().bind(
|
|
|
utcnow=datetime.datetime.utcnow())
|
|
|
|
|
|
deserialized_reports = schema.deserialize(payload)
|
|
|
if sequence_accepted is False:
|
|
|
deserialized_reports = [deserialized_reports]
|
|
|
if deserialized_reports:
|
|
|
rate_limiting(request, application,
|
|
|
'per_application_reports_rate_limit',
|
|
|
len(deserialized_reports))
|
|
|
|
|
|
# pprint.pprint(deserialized_reports)
|
|
|
tasks.add_reports.delay(application.resource_id, params,
|
|
|
deserialized_reports)
|
|
|
log.info('REPORT call %s, %s client:%s' % (
|
|
|
application,
|
|
|
proto_version,
|
|
|
request.headers.get('user_agent'))
|
|
|
)
|
|
|
return 'OK: Reports accepted'
|
|
|
|
|
|
|
|
|
@view_config(route_name='api_airbrake', renderer='string', permission='create',
|
|
|
require_csrf=False)
|
|
|
def airbrake_xml_compat(request):
|
|
|
"""
|
|
|
Airbrake compatible endpoint for XML reports
|
|
|
"""
|
|
|
application = request.context.resource
|
|
|
if request.method.upper() == 'OPTIONS':
|
|
|
return check_cors(request, application)
|
|
|
else:
|
|
|
check_cors(request, application, should_return=False)
|
|
|
|
|
|
params = dict(request.params.copy())
|
|
|
|
|
|
error_dict = parse_airbrake_xml(request)
|
|
|
schema = ReportListSchema_0_5().bind(utcnow=datetime.datetime.utcnow())
|
|
|
deserialized_reports = schema.deserialize([error_dict])
|
|
|
rate_limiting(request, application, 'per_application_reports_rate_limit',
|
|
|
len(deserialized_reports))
|
|
|
|
|
|
tasks.add_reports.delay(application.resource_id, params,
|
|
|
deserialized_reports)
|
|
|
log.info('%s AIRBRAKE call for application %s, api_ver:%s client:%s' % (
|
|
|
500, application.resource_name,
|
|
|
request.params.get('protocol_version', 'unknown'),
|
|
|
request.headers.get('user_agent'))
|
|
|
)
|
|
|
return '<notice><id>no-id</id><url>%s</url></notice>' % \
|
|
|
request.registry.settings['mailing.app_url']
|
|
|
|
|
|
|
|
|
def decompress_gzip(data):
|
|
|
try:
|
|
|
fp = io.StringIO(data)
|
|
|
with GzipFile(fileobj=fp) as f:
|
|
|
return f.read()
|
|
|
except Exception as exc:
|
|
|
raise
|
|
|
log.error(exc)
|
|
|
raise HTTPBadRequest()
|
|
|
|
|
|
|
|
|
def decompress_zlib(data):
|
|
|
try:
|
|
|
return zlib.decompress(data)
|
|
|
except Exception as exc:
|
|
|
raise
|
|
|
log.error(exc)
|
|
|
raise HTTPBadRequest()
|
|
|
|
|
|
|
|
|
def decode_b64(data):
|
|
|
try:
|
|
|
return base64.b64decode(data)
|
|
|
except Exception as exc:
|
|
|
raise
|
|
|
log.error(exc)
|
|
|
raise HTTPBadRequest()
|
|
|
|
|
|
|
|
|
@view_config(route_name='api_sentry', renderer='string', permission='create',
|
|
|
require_csrf=False)
|
|
|
@view_config(route_name='api_sentry_slash', renderer='string',
|
|
|
permission='create', require_csrf=False)
|
|
|
def sentry_compat(request):
|
|
|
"""
|
|
|
Sentry compatible endpoint
|
|
|
"""
|
|
|
application = request.context.resource
|
|
|
if request.method.upper() == 'OPTIONS':
|
|
|
return check_cors(request, application)
|
|
|
else:
|
|
|
check_cors(request, application, should_return=False)
|
|
|
|
|
|
# handle various report encoding
|
|
|
content_encoding = request.headers.get('Content-Encoding')
|
|
|
content_type = request.headers.get('Content-Type')
|
|
|
if content_encoding == 'gzip':
|
|
|
body = decompress_gzip(request.body)
|
|
|
elif content_encoding == 'deflate':
|
|
|
body = decompress_zlib(request.body)
|
|
|
else:
|
|
|
body = request.body
|
|
|
# attempt to fix string before decoding for stupid clients
|
|
|
if content_type == 'application/x-www-form-urlencoded':
|
|
|
body = urllib.parse.unquote(body.decode('utf8'))
|
|
|
check_char = '{' if isinstance(body, str) else b'{'
|
|
|
if not body.startswith(check_char):
|
|
|
try:
|
|
|
body = decode_b64(body)
|
|
|
body = decompress_zlib(body)
|
|
|
except Exception as exc:
|
|
|
log.info(exc)
|
|
|
|
|
|
try:
|
|
|
json_body = json.loads(body.decode('utf8'))
|
|
|
except ValueError:
|
|
|
raise JSONException("Incorrect JSON")
|
|
|
|
|
|
event, event_type = parse_sentry_event(json_body)
|
|
|
|
|
|
if event_type == ParsedSentryEventType.LOG:
|
|
|
if application.allow_permanent_storage:
|
|
|
schema = LogSchemaPermanent().bind(
|
|
|
utcnow=datetime.datetime.utcnow())
|
|
|
else:
|
|
|
schema = LogSchema().bind(
|
|
|
utcnow=datetime.datetime.utcnow())
|
|
|
deserialized_logs = schema.deserialize(event)
|
|
|
non_pkey_logs = [deserialized_logs]
|
|
|
log.debug('%s non-pkey logs received: %s' % (application,
|
|
|
len(non_pkey_logs)))
|
|
|
tasks.add_logs.delay(application.resource_id, {}, non_pkey_logs)
|
|
|
if event_type == ParsedSentryEventType.ERROR_REPORT:
|
|
|
schema = ReportSchema_0_5().bind(
|
|
|
utcnow=datetime.datetime.utcnow(),
|
|
|
allow_permanent_storage=application.allow_permanent_storage)
|
|
|
deserialized_reports = [schema.deserialize(event)]
|
|
|
rate_limiting(request, application,
|
|
|
'per_application_reports_rate_limit',
|
|
|
len(deserialized_reports))
|
|
|
tasks.add_reports.delay(application.resource_id, {},
|
|
|
deserialized_reports)
|
|
|
return 'OK: Events accepted'
|
|
|
|