api.py
422 lines
| 16.2 KiB
| text/x-python
|
PythonLexer
r0 | # -*- coding: utf-8 -*- | |||
# Copyright (C) 2010-2016 RhodeCode GmbH | ||||
# | ||||
# This program is free software: you can redistribute it and/or modify | ||||
# it under the terms of the GNU Affero General Public License, version 3 | ||||
# (only), as published by the Free Software Foundation. | ||||
# | ||||
# This program is distributed in the hope that it will be useful, | ||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||||
# GNU General Public License for more details. | ||||
# | ||||
# You should have received a copy of the GNU Affero General Public License | ||||
# along with this program. If not, see <http://www.gnu.org/licenses/>. | ||||
# | ||||
# This program is dual-licensed. If you wish to learn more about the | ||||
# App Enlight Enterprise Edition, including its added features, Support | ||||
# services, and proprietary license terms, please see | ||||
# https://rhodecode.com/licenses/ | ||||
import base64 | ||||
import io | ||||
import datetime | ||||
import json | ||||
import logging | ||||
import urllib.request, urllib.parse, urllib.error | ||||
import zlib | ||||
from gzip import GzipFile | ||||
from pyramid.view import view_config | ||||
from pyramid.httpexceptions import HTTPBadRequest | ||||
import appenlight.celery.tasks as tasks | ||||
from appenlight.lib.api import rate_limiting, check_cors | ||||
from appenlight.lib.enums import ParsedSentryEventType | ||||
from appenlight.lib.utils import parse_proto | ||||
from appenlight.lib.utils.airbrake import parse_airbrake_xml | ||||
from appenlight.lib.utils.date_utils import convert_date | ||||
from appenlight.lib.utils.sentry import parse_sentry_event | ||||
from appenlight.lib.request import JSONException | ||||
from appenlight.validators import (LogListSchema, | ||||
MetricsListSchema, | ||||
GeneralMetricsListSchema, | ||||
GeneralMetricSchema, | ||||
LogListPermanentSchema, | ||||
ReportListSchema_0_5, | ||||
LogSchema, | ||||
LogSchemaPermanent, | ||||
ReportSchema_0_5) | ||||
log = logging.getLogger(__name__) | ||||
@view_config(route_name='api_logs', renderer='string', permission='create', | ||||
require_csrf=False) | ||||
@view_config(route_name='api_log', renderer='string', permission='create', | ||||
require_csrf=False) | ||||
def logs_create(request): | ||||
""" | ||||
Endpoint for log aggregation | ||||
""" | ||||
application = request.context.resource | ||||
if request.method.upper() == 'OPTIONS': | ||||
return check_cors(request, application) | ||||
else: | ||||
check_cors(request, application, should_return=False) | ||||
params = dict(request.params.copy()) | ||||
proto_version = parse_proto(params.get('protocol_version', '')) | ||||
payload = request.unsafe_json_body | ||||
sequence_accepted = request.matched_route.name == 'api_logs' | ||||
if sequence_accepted: | ||||
if application.allow_permanent_storage: | ||||
schema = LogListPermanentSchema().bind( | ||||
utcnow=datetime.datetime.utcnow()) | ||||
else: | ||||
schema = LogListSchema().bind( | ||||
utcnow=datetime.datetime.utcnow()) | ||||
else: | ||||
if application.allow_permanent_storage: | ||||
schema = LogSchemaPermanent().bind( | ||||
utcnow=datetime.datetime.utcnow()) | ||||
else: | ||||
schema = LogSchema().bind( | ||||
utcnow=datetime.datetime.utcnow()) | ||||
deserialized_logs = schema.deserialize(payload) | ||||
if sequence_accepted is False: | ||||
deserialized_logs = [deserialized_logs] | ||||
rate_limiting(request, application, 'per_application_logs_rate_limit', | ||||
len(deserialized_logs)) | ||||
# pprint.pprint(deserialized_logs) | ||||
# we need to split those out so we can process the pkey ones one by one | ||||
non_pkey_logs = [log_dict for log_dict in deserialized_logs | ||||
if not log_dict['primary_key']] | ||||
pkey_dict = {} | ||||
# try to process the logs as best as we can and group together to reduce | ||||
# the amount of | ||||
for log_dict in deserialized_logs: | ||||
if log_dict['primary_key']: | ||||
key = (log_dict['primary_key'], log_dict['namespace'],) | ||||
if not key in pkey_dict: | ||||
pkey_dict[key] = [] | ||||
pkey_dict[key].append(log_dict) | ||||
if non_pkey_logs: | ||||
log.debug('%s non-pkey logs received: %s' % (application, | ||||
len(non_pkey_logs))) | ||||
tasks.add_logs.delay(application.resource_id, params, non_pkey_logs) | ||||
if pkey_dict: | ||||
logs_to_insert = [] | ||||
for primary_key_tuple, payload in pkey_dict.items(): | ||||
sorted_logs = sorted(payload, key=lambda x: x['date']) | ||||
logs_to_insert.append(sorted_logs[-1]) | ||||
log.debug('%s pkey logs received: %s' % (application, | ||||
len(logs_to_insert))) | ||||
tasks.add_logs.delay(application.resource_id, params, logs_to_insert) | ||||
log.info('LOG call %s %s client:%s' % ( | ||||
application, proto_version, request.headers.get('user_agent'))) | ||||
return 'OK: Logs accepted' | ||||
@view_config(route_name='api_request_stats', renderer='string', | ||||
permission='create', require_csrf=False) | ||||
@view_config(route_name='api_metrics', renderer='string', | ||||
permission='create', require_csrf=False) | ||||
def request_metrics_create(request): | ||||
""" | ||||
Endpoint for performance metrics, aggregates view performance stats | ||||
and converts them to general metric row | ||||
""" | ||||
application = request.context.resource | ||||
if request.method.upper() == 'OPTIONS': | ||||
return check_cors(request, application) | ||||
else: | ||||
check_cors(request, application, should_return=False) | ||||
params = dict(request.params.copy()) | ||||
proto_version = parse_proto(params.get('protocol_version', '')) | ||||
payload = request.unsafe_json_body | ||||
schema = MetricsListSchema() | ||||
dataset = schema.deserialize(payload) | ||||
rate_limiting(request, application, 'per_application_metrics_rate_limit', | ||||
len(dataset)) | ||||
# looping report data | ||||
metrics = {} | ||||
for metric in dataset: | ||||
server_name = metric.get('server', '').lower() or 'unknown' | ||||
start_interval = convert_date(metric['timestamp']) | ||||
start_interval = start_interval.replace(second=0, microsecond=0) | ||||
for view_name, view_metrics in metric['metrics']: | ||||
key = '%s%s%s' % (metric['server'], start_interval, view_name) | ||||
if start_interval not in metrics: | ||||
metrics[key] = {"requests": 0, "main": 0, "sql": 0, | ||||
"nosql": 0, "remote": 0, "tmpl": 0, | ||||
"custom": 0, 'sql_calls': 0, | ||||
'nosql_calls': 0, | ||||
'remote_calls': 0, 'tmpl_calls': 0, | ||||
'custom_calls': 0, | ||||
"start_interval": start_interval, | ||||
"server_name": server_name, | ||||
"view_name": view_name | ||||
} | ||||
metrics[key]["requests"] += int(view_metrics['requests']) | ||||
metrics[key]["main"] += round(view_metrics['main'], 5) | ||||
metrics[key]["sql"] += round(view_metrics['sql'], 5) | ||||
metrics[key]["nosql"] += round(view_metrics['nosql'], 5) | ||||
metrics[key]["remote"] += round(view_metrics['remote'], 5) | ||||
metrics[key]["tmpl"] += round(view_metrics['tmpl'], 5) | ||||
metrics[key]["custom"] += round(view_metrics.get('custom', 0.0), | ||||
5) | ||||
metrics[key]["sql_calls"] += int( | ||||
view_metrics.get('sql_calls', 0)) | ||||
metrics[key]["nosql_calls"] += int( | ||||
view_metrics.get('nosql_calls', 0)) | ||||
metrics[key]["remote_calls"] += int( | ||||
view_metrics.get('remote_calls', 0)) | ||||
metrics[key]["tmpl_calls"] += int( | ||||
view_metrics.get('tmpl_calls', 0)) | ||||
metrics[key]["custom_calls"] += int( | ||||
view_metrics.get('custom_calls', 0)) | ||||
if not metrics[key]["requests"]: | ||||
# fix this here because validator can't | ||||
metrics[key]["requests"] = 1 | ||||
# metrics dict is being built to minimize | ||||
# the amount of queries used | ||||
# in case we get multiple rows from same minute | ||||
normalized_metrics = [] | ||||
for metric in metrics.values(): | ||||
new_metric = { | ||||
'namespace': 'appenlight.request_metric', | ||||
'timestamp': metric.pop('start_interval'), | ||||
'server_name': metric['server_name'], | ||||
'tags': list(metric.items()) | ||||
} | ||||
normalized_metrics.append(new_metric) | ||||
tasks.add_metrics.delay(application.resource_id, params, | ||||
normalized_metrics, proto_version) | ||||
log.info('REQUEST METRICS call {} {} client:{}'.format( | ||||
application.resource_name, proto_version, | ||||
request.headers.get('user_agent'))) | ||||
return 'OK: request metrics accepted' | ||||
@view_config(route_name='api_general_metrics', renderer='string', | ||||
permission='create', require_csrf=False) | ||||
@view_config(route_name='api_general_metric', renderer='string', | ||||
permission='create', require_csrf=False) | ||||
def general_metrics_create(request): | ||||
""" | ||||
Endpoint for general metrics aggregation | ||||
""" | ||||
application = request.context.resource | ||||
if request.method.upper() == 'OPTIONS': | ||||
return check_cors(request, application) | ||||
else: | ||||
check_cors(request, application, should_return=False) | ||||
params = dict(request.params.copy()) | ||||
proto_version = parse_proto(params.get('protocol_version', '')) | ||||
payload = request.unsafe_json_body | ||||
sequence_accepted = request.matched_route.name == 'api_general_metrics' | ||||
if sequence_accepted: | ||||
schema = GeneralMetricsListSchema().bind( | ||||
utcnow=datetime.datetime.utcnow()) | ||||
else: | ||||
schema = GeneralMetricSchema().bind(utcnow=datetime.datetime.utcnow()) | ||||
deserialized_metrics = schema.deserialize(payload) | ||||
if sequence_accepted is False: | ||||
deserialized_metrics = [deserialized_metrics] | ||||
rate_limiting(request, application, 'per_application_metrics_rate_limit', | ||||
len(deserialized_metrics)) | ||||
tasks.add_metrics.delay(application.resource_id, params, | ||||
deserialized_metrics, proto_version) | ||||
log.info('METRICS call {} {} client:{}'.format( | ||||
application.resource_name, proto_version, | ||||
request.headers.get('user_agent'))) | ||||
return 'OK: Metrics accepted' | ||||
@view_config(route_name='api_reports', renderer='string', permission='create', | ||||
require_csrf=False) | ||||
@view_config(route_name='api_slow_reports', renderer='string', | ||||
permission='create', require_csrf=False) | ||||
@view_config(route_name='api_report', renderer='string', permission='create', | ||||
require_csrf=False) | ||||
def reports_create(request): | ||||
""" | ||||
Endpoint for exception and slowness reports | ||||
""" | ||||
# route_url('reports') | ||||
application = request.context.resource | ||||
if request.method.upper() == 'OPTIONS': | ||||
return check_cors(request, application) | ||||
else: | ||||
check_cors(request, application, should_return=False) | ||||
params = dict(request.params.copy()) | ||||
proto_version = parse_proto(params.get('protocol_version', '')) | ||||
payload = request.unsafe_json_body | ||||
sequence_accepted = request.matched_route.name == 'api_reports' | ||||
if sequence_accepted: | ||||
schema = ReportListSchema_0_5().bind( | ||||
utcnow=datetime.datetime.utcnow()) | ||||
else: | ||||
schema = ReportSchema_0_5().bind( | ||||
utcnow=datetime.datetime.utcnow()) | ||||
deserialized_reports = schema.deserialize(payload) | ||||
if sequence_accepted is False: | ||||
deserialized_reports = [deserialized_reports] | ||||
if deserialized_reports: | ||||
rate_limiting(request, application, | ||||
'per_application_reports_rate_limit', | ||||
len(deserialized_reports)) | ||||
# pprint.pprint(deserialized_reports) | ||||
tasks.add_reports.delay(application.resource_id, params, | ||||
deserialized_reports) | ||||
log.info('REPORT call %s, %s client:%s' % ( | ||||
application, | ||||
proto_version, | ||||
request.headers.get('user_agent')) | ||||
) | ||||
return 'OK: Reports accepted' | ||||
@view_config(route_name='api_airbrake', renderer='string', permission='create', | ||||
require_csrf=False) | ||||
def airbrake_xml_compat(request): | ||||
""" | ||||
Airbrake compatible endpoint for XML reports | ||||
""" | ||||
application = request.context.resource | ||||
if request.method.upper() == 'OPTIONS': | ||||
return check_cors(request, application) | ||||
else: | ||||
check_cors(request, application, should_return=False) | ||||
params = request.params.copy() | ||||
error_dict = parse_airbrake_xml(request) | ||||
schema = ReportListSchema_0_5().bind(utcnow=datetime.datetime.utcnow()) | ||||
deserialized_reports = schema.deserialize([error_dict]) | ||||
rate_limiting(request, application, 'per_application_reports_rate_limit', | ||||
len(deserialized_reports)) | ||||
tasks.add_reports.delay(application.resource_id, params, | ||||
deserialized_reports) | ||||
log.info('%s AIRBRAKE call for application %s, api_ver:%s client:%s' % ( | ||||
500, application.resource_name, | ||||
request.params.get('protocol_version', 'unknown'), | ||||
request.headers.get('user_agent')) | ||||
) | ||||
return '<notice><id>no-id</id><url>%s</url></notice>' % \ | ||||
request.registry.settings['mailing.app_url'] | ||||
def decompress_gzip(data): | ||||
try: | ||||
fp = io.StringIO(data) | ||||
with GzipFile(fileobj=fp) as f: | ||||
return f.read() | ||||
except Exception as exc: | ||||
raise | ||||
log.error(exc) | ||||
raise HTTPBadRequest() | ||||
def decompress_zlib(data): | ||||
try: | ||||
return zlib.decompress(data) | ||||
except Exception as exc: | ||||
raise | ||||
log.error(exc) | ||||
raise HTTPBadRequest() | ||||
def decode_b64(data): | ||||
try: | ||||
return base64.b64decode(data) | ||||
except Exception as exc: | ||||
raise | ||||
log.error(exc) | ||||
raise HTTPBadRequest() | ||||
@view_config(route_name='api_sentry', renderer='string', permission='create', | ||||
require_csrf=False) | ||||
@view_config(route_name='api_sentry_slash', renderer='string', | ||||
permission='create', require_csrf=False) | ||||
def sentry_compat(request): | ||||
""" | ||||
Sentry compatible endpoint | ||||
""" | ||||
application = request.context.resource | ||||
if request.method.upper() == 'OPTIONS': | ||||
return check_cors(request, application) | ||||
else: | ||||
check_cors(request, application, should_return=False) | ||||
# handle various report encoding | ||||
content_encoding = request.headers.get('Content-Encoding') | ||||
content_type = request.headers.get('Content-Type') | ||||
if content_encoding == 'gzip': | ||||
body = decompress_gzip(request.body) | ||||
elif content_encoding == 'deflate': | ||||
body = decompress_zlib(request.body) | ||||
else: | ||||
body = request.body | ||||
# attempt to fix string before decoding for stupid clients | ||||
if content_type == 'application/x-www-form-urlencoded': | ||||
body = urllib.parse.unquote(body.decode('utf8')) | ||||
check_char = '{' if isinstance(body, str) else b'{' | ||||
if not body.startswith(check_char): | ||||
try: | ||||
body = decode_b64(body) | ||||
body = decompress_zlib(body) | ||||
except Exception as exc: | ||||
log.info(exc) | ||||
try: | ||||
json_body = json.loads(body.decode('utf8')) | ||||
except ValueError: | ||||
raise JSONException("Incorrect JSON") | ||||
event, event_type = parse_sentry_event(json_body) | ||||
if event_type == ParsedSentryEventType.LOG: | ||||
schema = LogSchema().bind(utcnow=datetime.datetime.utcnow()) | ||||
deserialized_logs = schema.deserialize(event) | ||||
non_pkey_logs = [deserialized_logs] | ||||
log.debug('%s non-pkey logs received: %s' % (application, | ||||
len(non_pkey_logs))) | ||||
tasks.add_logs.delay(application.resource_id, {}, non_pkey_logs) | ||||
if event_type == ParsedSentryEventType.ERROR_REPORT: | ||||
schema = ReportSchema_0_5().bind(utcnow=datetime.datetime.utcnow()) | ||||
deserialized_reports = [schema.deserialize(event)] | ||||
rate_limiting(request, application, | ||||
'per_application_reports_rate_limit', | ||||
len(deserialized_reports)) | ||||
tasks.add_reports.delay(application.resource_id, {}, | ||||
deserialized_reports) | ||||
return 'OK: Events accepted' | ||||