log.py
222 lines
| 7.7 KiB
| text/x-python
|
PythonLexer
r0 | # -*- coding: utf-8 -*- | |||
r112 | # Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors | |||
r0 | # | |||
r112 | # Licensed under the Apache License, Version 2.0 (the "License"); | |||
# you may not use this file except in compliance with the License. | ||||
# You may obtain a copy of the License at | ||||
r0 | # | |||
r112 | # http://www.apache.org/licenses/LICENSE-2.0 | |||
r0 | # | |||
r112 | # Unless required by applicable law or agreed to in writing, software | |||
# distributed under the License is distributed on an "AS IS" BASIS, | ||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
# See the License for the specific language governing permissions and | ||||
# limitations under the License. | ||||
r0 | ||||
import paginate | ||||
import logging | ||||
import sqlalchemy as sa | ||||
from appenlight.models.log import Log | ||||
from appenlight.models import get_db_session, Datastores | ||||
from appenlight.models.services.base import BaseService | ||||
from appenlight.lib.utils import es_index_name_limiter | ||||
log = logging.getLogger(__name__) | ||||
class LogService(BaseService): | ||||
@classmethod | ||||
r153 | def get_logs(cls, resource_ids=None, filter_settings=None, db_session=None): | |||
r0 | # ensure we always have id's passed | |||
if not resource_ids: | ||||
# raise Exception('No App ID passed') | ||||
return [] | ||||
db_session = get_db_session(db_session) | ||||
q = db_session.query(Log) | ||||
q = q.filter(Log.resource_id.in_(resource_ids)) | ||||
r153 | if filter_settings.get("start_date"): | |||
q = q.filter(Log.timestamp >= filter_settings.get("start_date")) | ||||
if filter_settings.get("end_date"): | ||||
q = q.filter(Log.timestamp <= filter_settings.get("end_date")) | ||||
if filter_settings.get("log_level"): | ||||
q = q.filter(Log.log_level == filter_settings.get("log_level").upper()) | ||||
if filter_settings.get("request_id"): | ||||
request_id = filter_settings.get("request_id", "") | ||||
q = q.filter(Log.request_id == request_id.replace("-", "")) | ||||
if filter_settings.get("namespace"): | ||||
q = q.filter(Log.namespace == filter_settings.get("namespace")) | ||||
r0 | q = q.order_by(sa.desc(Log.timestamp)) | |||
return q | ||||
@classmethod | ||||
def es_query_builder(cls, app_ids, filter_settings): | ||||
if not filter_settings: | ||||
filter_settings = {} | ||||
query = { | ||||
"query": { | ||||
"filtered": { | ||||
r153 | "filter": {"and": [{"terms": {"resource_id": list(app_ids)}}]} | |||
r0 | } | |||
} | ||||
} | ||||
r153 | start_date = filter_settings.get("start_date") | |||
end_date = filter_settings.get("end_date") | ||||
filter_part = query["query"]["filtered"]["filter"]["and"] | ||||
r0 | ||||
r153 | for tag in filter_settings.get("tags", []): | |||
tag_values = [v.lower() for v in tag["value"]] | ||||
key = "tags.%s.values" % tag["name"].replace(".", "_") | ||||
r0 | filter_part.append({"terms": {key: tag_values}}) | |||
date_range = {"range": {"timestamp": {}}} | ||||
if start_date: | ||||
date_range["range"]["timestamp"]["gte"] = start_date | ||||
if end_date: | ||||
date_range["range"]["timestamp"]["lte"] = end_date | ||||
if start_date or end_date: | ||||
filter_part.append(date_range) | ||||
r153 | levels = filter_settings.get("level") | |||
r0 | if levels: | |||
r153 | filter_part.append({"terms": {"log_level": levels}}) | |||
namespaces = filter_settings.get("namespace") | ||||
r0 | if namespaces: | |||
r153 | filter_part.append({"terms": {"namespace": namespaces}}) | |||
r0 | ||||
r153 | request_ids = filter_settings.get("request_id") | |||
r0 | if request_ids: | |||
r153 | filter_part.append({"terms": {"request_id": request_ids}}) | |||
r0 | ||||
r153 | messages = filter_settings.get("message") | |||
r0 | if messages: | |||
r153 | query["query"]["filtered"]["query"] = { | |||
"match": {"message": {"query": " ".join(messages), "operator": "and"}} | ||||
r44 | } | |||
r0 | return query | |||
@classmethod | ||||
def get_time_series_aggregate(cls, app_ids=None, filter_settings=None): | ||||
if not app_ids: | ||||
return {} | ||||
es_query = cls.es_query_builder(app_ids, filter_settings) | ||||
es_query["aggs"] = { | ||||
"events_over_time": { | ||||
"date_histogram": { | ||||
"field": "timestamp", | ||||
"interval": "1h", | ||||
r61 | "min_doc_count": 0, | |||
r153 | "extended_bounds": { | |||
"max": filter_settings.get("end_date"), | ||||
"min": filter_settings.get("start_date"), | ||||
}, | ||||
r0 | } | |||
} | ||||
} | ||||
log.debug(es_query) | ||||
r153 | index_names = es_index_name_limiter( | |||
filter_settings.get("start_date"), | ||||
filter_settings.get("end_date"), | ||||
ixtypes=["logs"], | ||||
) | ||||
r0 | if index_names: | |||
results = Datastores.es.search( | ||||
r153 | body=es_query, index=index_names, doc_type="log", size=0 | |||
) | ||||
r0 | else: | |||
results = [] | ||||
return results | ||||
@classmethod | ||||
r153 | def get_search_iterator( | |||
cls, | ||||
app_ids=None, | ||||
page=1, | ||||
items_per_page=50, | ||||
order_by=None, | ||||
filter_settings=None, | ||||
limit=None, | ||||
): | ||||
r0 | if not app_ids: | |||
return {}, 0 | ||||
es_query = cls.es_query_builder(app_ids, filter_settings) | ||||
r153 | sort_query = {"sort": [{"timestamp": {"order": "desc"}}]} | |||
r0 | es_query.update(sort_query) | |||
log.debug(es_query) | ||||
es_from = (page - 1) * items_per_page | ||||
r153 | index_names = es_index_name_limiter( | |||
filter_settings.get("start_date"), | ||||
filter_settings.get("end_date"), | ||||
ixtypes=["logs"], | ||||
) | ||||
r0 | if not index_names: | |||
return {}, 0 | ||||
r153 | results = Datastores.es.search( | |||
body=es_query, | ||||
index=index_names, | ||||
doc_type="log", | ||||
size=items_per_page, | ||||
from_=es_from, | ||||
) | ||||
if results["hits"]["total"] > 5000: | ||||
r0 | count = 5000 | |||
else: | ||||
r153 | count = results["hits"]["total"] | |||
return results["hits"], count | ||||
r0 | ||||
@classmethod | ||||
r153 | def get_paginator_by_app_ids( | |||
cls, | ||||
app_ids=None, | ||||
page=1, | ||||
item_count=None, | ||||
items_per_page=50, | ||||
order_by=None, | ||||
filter_settings=None, | ||||
exclude_columns=None, | ||||
db_session=None, | ||||
): | ||||
r0 | if not filter_settings: | |||
filter_settings = {} | ||||
r153 | results, item_count = cls.get_search_iterator( | |||
app_ids, page, items_per_page, order_by, filter_settings | ||||
) | ||||
paginator = paginate.Page( | ||||
[], item_count=item_count, items_per_page=items_per_page, **filter_settings | ||||
) | ||||
ordered_ids = tuple( | ||||
item["_source"]["pg_id"] for item in results.get("hits", []) | ||||
) | ||||
r0 | ||||
sorted_instance_list = [] | ||||
if ordered_ids: | ||||
db_session = get_db_session(db_session) | ||||
query = db_session.query(Log) | ||||
query = query.filter(Log.log_id.in_(ordered_ids)) | ||||
r153 | query = query.order_by(sa.desc("timestamp")) | |||
r0 | sa_items = query.all() | |||
# resort by score | ||||
for i_id in ordered_ids: | ||||
for item in sa_items: | ||||
if str(item.log_id) == str(i_id): | ||||
sorted_instance_list.append(item) | ||||
paginator.sa_items = sorted_instance_list | ||||
return paginator | ||||
@classmethod | ||||
r153 | def query_by_primary_key_and_namespace(cls, list_of_pairs, db_session=None): | |||
r0 | db_session = get_db_session(db_session) | |||
list_of_conditions = [] | ||||
query = db_session.query(Log) | ||||
for pair in list_of_pairs: | ||||
r153 | list_of_conditions.append( | |||
sa.and_(Log.primary_key == pair["pk"], Log.namespace == pair["ns"]) | ||||
) | ||||
r0 | query = query.filter(sa.or_(*list_of_conditions)) | |||
query = query.order_by(sa.asc(Log.timestamp), sa.asc(Log.log_id)) | ||||
return query | ||||