##// END OF EJS Templates
ini: added new key
ini: added new key

File last commit:

r112:998f0d14
r129:489ce37b
Show More
api.py
435 lines | 16.7 KiB | text/x-python | PythonLexer
# -*- coding: utf-8 -*-
# Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import base64
import io
import datetime
import json
import logging
import urllib.request, urllib.parse, urllib.error
import zlib
from gzip import GzipFile
from pyramid.view import view_config
from pyramid.httpexceptions import HTTPBadRequest
import appenlight.celery.tasks as tasks
from appenlight.lib.api import rate_limiting, check_cors
from appenlight.lib.enums import ParsedSentryEventType
from appenlight.lib.utils import parse_proto
from appenlight.lib.utils.airbrake import parse_airbrake_xml
from appenlight.lib.utils.date_utils import convert_date
from appenlight.lib.utils.sentry import parse_sentry_event
from appenlight.lib.request import JSONException
from appenlight.validators import (LogListSchema,
MetricsListSchema,
GeneralMetricsListSchema,
GeneralMetricsPermanentListSchema,
GeneralMetricSchema,
GeneralMetricPermanentSchema,
LogListPermanentSchema,
ReportListSchema_0_5,
LogSchema,
LogSchemaPermanent,
ReportSchema_0_5)
log = logging.getLogger(__name__)
@view_config(route_name='api_logs', renderer='string', permission='create',
require_csrf=False)
@view_config(route_name='api_log', renderer='string', permission='create',
require_csrf=False)
def logs_create(request):
"""
Endpoint for log aggregation
"""
application = request.context.resource
if request.method.upper() == 'OPTIONS':
return check_cors(request, application)
else:
check_cors(request, application, should_return=False)
params = dict(request.params.copy())
proto_version = parse_proto(params.get('protocol_version', ''))
payload = request.unsafe_json_body
sequence_accepted = request.matched_route.name == 'api_logs'
if sequence_accepted:
if application.allow_permanent_storage:
schema = LogListPermanentSchema().bind(
utcnow=datetime.datetime.utcnow())
else:
schema = LogListSchema().bind(
utcnow=datetime.datetime.utcnow())
else:
if application.allow_permanent_storage:
schema = LogSchemaPermanent().bind(
utcnow=datetime.datetime.utcnow())
else:
schema = LogSchema().bind(
utcnow=datetime.datetime.utcnow())
deserialized_logs = schema.deserialize(payload)
if sequence_accepted is False:
deserialized_logs = [deserialized_logs]
rate_limiting(request, application, 'per_application_logs_rate_limit',
len(deserialized_logs))
# pprint.pprint(deserialized_logs)
# we need to split those out so we can process the pkey ones one by one
non_pkey_logs = [log_dict for log_dict in deserialized_logs
if not log_dict['primary_key']]
pkey_dict = {}
# try to process the logs as best as we can and group together to reduce
# the amount of
for log_dict in deserialized_logs:
if log_dict['primary_key']:
key = (log_dict['primary_key'], log_dict['namespace'],)
if not key in pkey_dict:
pkey_dict[key] = []
pkey_dict[key].append(log_dict)
if non_pkey_logs:
log.debug('%s non-pkey logs received: %s' % (application,
len(non_pkey_logs)))
tasks.add_logs.delay(application.resource_id, params, non_pkey_logs)
if pkey_dict:
logs_to_insert = []
for primary_key_tuple, payload in pkey_dict.items():
sorted_logs = sorted(payload, key=lambda x: x['date'])
logs_to_insert.append(sorted_logs[-1])
log.debug('%s pkey logs received: %s' % (application,
len(logs_to_insert)))
tasks.add_logs.delay(application.resource_id, params, logs_to_insert)
log.info('LOG call %s %s client:%s' % (
application, proto_version, request.headers.get('user_agent')))
return 'OK: Logs accepted'
@view_config(route_name='api_request_stats', renderer='string',
permission='create', require_csrf=False)
@view_config(route_name='api_metrics', renderer='string',
permission='create', require_csrf=False)
def request_metrics_create(request):
"""
Endpoint for performance metrics, aggregates view performance stats
and converts them to general metric row
"""
application = request.context.resource
if request.method.upper() == 'OPTIONS':
return check_cors(request, application)
else:
check_cors(request, application, should_return=False)
params = dict(request.params.copy())
proto_version = parse_proto(params.get('protocol_version', ''))
payload = request.unsafe_json_body
schema = MetricsListSchema()
dataset = schema.deserialize(payload)
rate_limiting(request, application, 'per_application_metrics_rate_limit',
len(dataset))
# looping report data
metrics = {}
for metric in dataset:
server_name = metric.get('server', '').lower() or 'unknown'
start_interval = convert_date(metric['timestamp'])
start_interval = start_interval.replace(second=0, microsecond=0)
for view_name, view_metrics in metric['metrics']:
key = '%s%s%s' % (metric['server'], start_interval, view_name)
if start_interval not in metrics:
metrics[key] = {"requests": 0, "main": 0, "sql": 0,
"nosql": 0, "remote": 0, "tmpl": 0,
"custom": 0, 'sql_calls': 0,
'nosql_calls': 0,
'remote_calls': 0, 'tmpl_calls': 0,
'custom_calls': 0,
"start_interval": start_interval,
"server_name": server_name,
"view_name": view_name
}
metrics[key]["requests"] += int(view_metrics['requests'])
metrics[key]["main"] += round(view_metrics['main'], 5)
metrics[key]["sql"] += round(view_metrics['sql'], 5)
metrics[key]["nosql"] += round(view_metrics['nosql'], 5)
metrics[key]["remote"] += round(view_metrics['remote'], 5)
metrics[key]["tmpl"] += round(view_metrics['tmpl'], 5)
metrics[key]["custom"] += round(view_metrics.get('custom', 0.0),
5)
metrics[key]["sql_calls"] += int(
view_metrics.get('sql_calls', 0))
metrics[key]["nosql_calls"] += int(
view_metrics.get('nosql_calls', 0))
metrics[key]["remote_calls"] += int(
view_metrics.get('remote_calls', 0))
metrics[key]["tmpl_calls"] += int(
view_metrics.get('tmpl_calls', 0))
metrics[key]["custom_calls"] += int(
view_metrics.get('custom_calls', 0))
if not metrics[key]["requests"]:
# fix this here because validator can't
metrics[key]["requests"] = 1
# metrics dict is being built to minimize
# the amount of queries used
# in case we get multiple rows from same minute
normalized_metrics = []
for metric in metrics.values():
new_metric = {
'namespace': 'appenlight.request_metric',
'timestamp': metric.pop('start_interval'),
'server_name': metric['server_name'],
'tags': list(metric.items())
}
normalized_metrics.append(new_metric)
tasks.add_metrics.delay(application.resource_id, params,
normalized_metrics, proto_version)
log.info('REQUEST METRICS call {} {} client:{}'.format(
application.resource_name, proto_version,
request.headers.get('user_agent')))
return 'OK: request metrics accepted'
@view_config(route_name='api_general_metrics', renderer='string',
permission='create', require_csrf=False)
@view_config(route_name='api_general_metric', renderer='string',
permission='create', require_csrf=False)
def general_metrics_create(request):
"""
Endpoint for general metrics aggregation
"""
application = request.context.resource
if request.method.upper() == 'OPTIONS':
return check_cors(request, application)
else:
check_cors(request, application, should_return=False)
params = dict(request.params.copy())
proto_version = parse_proto(params.get('protocol_version', ''))
payload = request.unsafe_json_body
sequence_accepted = request.matched_route.name == 'api_general_metrics'
if sequence_accepted:
if application.allow_permanent_storage:
schema = GeneralMetricsPermanentListSchema().bind(
utcnow=datetime.datetime.utcnow())
else:
schema = GeneralMetricsListSchema().bind(
utcnow=datetime.datetime.utcnow())
else:
if application.allow_permanent_storage:
schema = GeneralMetricPermanentSchema().bind(
utcnow=datetime.datetime.utcnow())
else:
schema = GeneralMetricSchema().bind(
utcnow=datetime.datetime.utcnow())
deserialized_metrics = schema.deserialize(payload)
if sequence_accepted is False:
deserialized_metrics = [deserialized_metrics]
rate_limiting(request, application, 'per_application_metrics_rate_limit',
len(deserialized_metrics))
tasks.add_metrics.delay(application.resource_id, params,
deserialized_metrics, proto_version)
log.info('METRICS call {} {} client:{}'.format(
application.resource_name, proto_version,
request.headers.get('user_agent')))
return 'OK: Metrics accepted'
@view_config(route_name='api_reports', renderer='string', permission='create',
require_csrf=False)
@view_config(route_name='api_slow_reports', renderer='string',
permission='create', require_csrf=False)
@view_config(route_name='api_report', renderer='string', permission='create',
require_csrf=False)
def reports_create(request):
"""
Endpoint for exception and slowness reports
"""
# route_url('reports')
application = request.context.resource
if request.method.upper() == 'OPTIONS':
return check_cors(request, application)
else:
check_cors(request, application, should_return=False)
params = dict(request.params.copy())
proto_version = parse_proto(params.get('protocol_version', ''))
payload = request.unsafe_json_body
sequence_accepted = request.matched_route.name == 'api_reports'
if sequence_accepted:
schema = ReportListSchema_0_5().bind(
utcnow=datetime.datetime.utcnow())
else:
schema = ReportSchema_0_5().bind(
utcnow=datetime.datetime.utcnow())
deserialized_reports = schema.deserialize(payload)
if sequence_accepted is False:
deserialized_reports = [deserialized_reports]
if deserialized_reports:
rate_limiting(request, application,
'per_application_reports_rate_limit',
len(deserialized_reports))
# pprint.pprint(deserialized_reports)
tasks.add_reports.delay(application.resource_id, params,
deserialized_reports)
log.info('REPORT call %s, %s client:%s' % (
application,
proto_version,
request.headers.get('user_agent'))
)
return 'OK: Reports accepted'
@view_config(route_name='api_airbrake', renderer='string', permission='create',
require_csrf=False)
def airbrake_xml_compat(request):
"""
Airbrake compatible endpoint for XML reports
"""
application = request.context.resource
if request.method.upper() == 'OPTIONS':
return check_cors(request, application)
else:
check_cors(request, application, should_return=False)
params = dict(request.params.copy())
error_dict = parse_airbrake_xml(request)
schema = ReportListSchema_0_5().bind(utcnow=datetime.datetime.utcnow())
deserialized_reports = schema.deserialize([error_dict])
rate_limiting(request, application, 'per_application_reports_rate_limit',
len(deserialized_reports))
tasks.add_reports.delay(application.resource_id, params,
deserialized_reports)
log.info('%s AIRBRAKE call for application %s, api_ver:%s client:%s' % (
500, application.resource_name,
request.params.get('protocol_version', 'unknown'),
request.headers.get('user_agent'))
)
return '<notice><id>no-id</id><url>%s</url></notice>' % \
request.registry.settings['mailing.app_url']
def decompress_gzip(data):
try:
fp = io.StringIO(data)
with GzipFile(fileobj=fp) as f:
return f.read()
except Exception as exc:
raise
log.error(exc)
raise HTTPBadRequest()
def decompress_zlib(data):
try:
return zlib.decompress(data)
except Exception as exc:
raise
log.error(exc)
raise HTTPBadRequest()
def decode_b64(data):
try:
return base64.b64decode(data)
except Exception as exc:
raise
log.error(exc)
raise HTTPBadRequest()
@view_config(route_name='api_sentry', renderer='string', permission='create',
require_csrf=False)
@view_config(route_name='api_sentry_slash', renderer='string',
permission='create', require_csrf=False)
def sentry_compat(request):
"""
Sentry compatible endpoint
"""
application = request.context.resource
if request.method.upper() == 'OPTIONS':
return check_cors(request, application)
else:
check_cors(request, application, should_return=False)
# handle various report encoding
content_encoding = request.headers.get('Content-Encoding')
content_type = request.headers.get('Content-Type')
if content_encoding == 'gzip':
body = decompress_gzip(request.body)
elif content_encoding == 'deflate':
body = decompress_zlib(request.body)
else:
body = request.body
# attempt to fix string before decoding for stupid clients
if content_type == 'application/x-www-form-urlencoded':
body = urllib.parse.unquote(body.decode('utf8'))
check_char = '{' if isinstance(body, str) else b'{'
if not body.startswith(check_char):
try:
body = decode_b64(body)
body = decompress_zlib(body)
except Exception as exc:
log.info(exc)
try:
json_body = json.loads(body.decode('utf8'))
except ValueError:
raise JSONException("Incorrect JSON")
event, event_type = parse_sentry_event(json_body)
if event_type == ParsedSentryEventType.LOG:
if application.allow_permanent_storage:
schema = LogSchemaPermanent().bind(
utcnow=datetime.datetime.utcnow())
else:
schema = LogSchema().bind(
utcnow=datetime.datetime.utcnow())
deserialized_logs = schema.deserialize(event)
non_pkey_logs = [deserialized_logs]
log.debug('%s non-pkey logs received: %s' % (application,
len(non_pkey_logs)))
tasks.add_logs.delay(application.resource_id, {}, non_pkey_logs)
if event_type == ParsedSentryEventType.ERROR_REPORT:
schema = ReportSchema_0_5().bind(
utcnow=datetime.datetime.utcnow(),
allow_permanent_storage=application.allow_permanent_storage)
deserialized_reports = [schema.deserialize(event)]
rate_limiting(request, application,
'per_application_reports_rate_limit',
len(deserialized_reports))
tasks.add_reports.delay(application.resource_id, {},
deserialized_reports)
return 'OK: Events accepted'