##// END OF EJS Templates
elasticsearch: replace pyelasticsearch with elasticsearch
ergo -
Show More
@@ -1,49 +1,49 b''
1 1 repoze.sendmail==4.1
2 2 pyramid==1.10.2
3 3 pyramid_tm==0.12
4 4 pyramid_debugtoolbar
5 5 pyramid_authstack==1.0.1
6 6 SQLAlchemy==1.0.12
7 7 alembic==1.0.8
8 8 webhelpers2==2.0
9 9 transaction==1.4.3
10 10 zope.sqlalchemy==0.7.6
11 11 pyramid_mailer==0.14.1
12 12 redis==2.10.5
13 13 redlock-py==1.0.8
14 14 pyramid_jinja2==2.6.2
15 15 psycopg2==2.7.7
16 16 wtforms==2.1
17 17 celery==3.1.23
18 18 formencode==1.3.0
19 19 psutil==2.1.2
20 20 ziggurat_foundations==0.8.3
21 21 bcrypt==3.1.6
22 22 appenlight_client
23 23 markdown==2.5
24 24 colander==1.7
25 25 defusedxml==0.5.0
26 26 dogpile.cache==0.5.7
27 27 pyramid_redis_sessions==1.0.1
28 28 simplejson==3.8.2
29 29 waitress==1.0
30 30 gunicorn==19.9.0
31 31 requests==2.20.0
32 32 requests_oauthlib==0.6.1
33 33 gevent==1.1.1
34 34 gevent-websocket==0.9.5
35 35 pygments==2.1.3
36 36 lxml==4.3.2
37 37 paginate==0.5.4
38 38 paginate-sqlalchemy==0.2.0
39 pyelasticsearch==1.4
39 elasticsearch>=2.0.0,<3.0.0
40 40 six>=1.10.0
41 41 mock==1.0.1
42 42 itsdangerous==1.1.0
43 43 camplight==0.9.6
44 44 jira==1.0.7
45 45 python-dateutil==2.5.3
46 46 authomatic==0.1.0.post1
47 47 cryptography==2.6.1
48 48 webassets==0.11.1
49 49
@@ -1,225 +1,225 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors
4 4 #
5 5 # Licensed under the Apache License, Version 2.0 (the "License");
6 6 # you may not use this file except in compliance with the License.
7 7 # You may obtain a copy of the License at
8 8 #
9 9 # http://www.apache.org/licenses/LICENSE-2.0
10 10 #
11 11 # Unless required by applicable law or agreed to in writing, software
12 12 # distributed under the License is distributed on an "AS IS" BASIS,
13 13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 14 # See the License for the specific language governing permissions and
15 15 # limitations under the License.
16 16
17 17 import datetime
18 18 import logging
19 import pyelasticsearch
19 from elasticsearch import Elasticsearch
20 20 import redis
21 21 import os
22 22 import pkg_resources
23 23 from pkg_resources import iter_entry_points
24 24
25 25 import appenlight.lib.jinja2_filters as jinja2_filters
26 26 import appenlight.lib.encryption as encryption
27 27
28 28 from pyramid.config import PHASE3_CONFIG
29 29 from pyramid.authentication import AuthTktAuthenticationPolicy
30 30 from pyramid.authorization import ACLAuthorizationPolicy
31 31 from pyramid_mailer.interfaces import IMailer
32 32 from pyramid.renderers import JSON
33 33 from pyramid_redis_sessions import session_factory_from_settings
34 34 from pyramid.settings import asbool, aslist
35 35 from pyramid.security import AllPermissionsList
36 36 from pyramid_authstack import AuthenticationStackPolicy
37 37 from redlock import Redlock
38 38 from sqlalchemy import engine_from_config
39 39
40 40 from appenlight.celery import configure_celery
41 41 from appenlight.lib.configurator import (CythonCompatConfigurator,
42 42 register_appenlight_plugin)
43 43 from appenlight.lib import cache_regions
44 44 from appenlight.lib.ext_json import json
45 45 from appenlight.security import groupfinder, AuthTokenAuthenticationPolicy
46 46
47 47 __license__ = 'Apache 2.0'
48 48 __author__ = 'RhodeCode GmbH'
49 49 __url__ = 'http://rhodecode.com'
50 50 __version__ = pkg_resources.get_distribution("appenlight").parsed_version
51 51
52 52 json_renderer = JSON(serializer=json.dumps, indent=4)
53 53
54 54 log = logging.getLogger(__name__)
55 55
56 56
57 57 def datetime_adapter(obj, request):
58 58 return obj.isoformat()
59 59
60 60
61 61 def all_permissions_adapter(obj, request):
62 62 return '__all_permissions__'
63 63
64 64
65 65 json_renderer.add_adapter(datetime.datetime, datetime_adapter)
66 66 json_renderer.add_adapter(AllPermissionsList, all_permissions_adapter)
67 67
68 68
69 69 def main(global_config, **settings):
70 70 """ This function returns a Pyramid WSGI application.
71 71 """
72 72 auth_tkt_policy = AuthTktAuthenticationPolicy(
73 73 settings['authtkt.secret'],
74 74 hashalg='sha512',
75 75 callback=groupfinder,
76 76 max_age=2592000,
77 77 secure=asbool(settings.get('authtkt.secure', 'false')))
78 78 auth_token_policy = AuthTokenAuthenticationPolicy(
79 79 callback=groupfinder
80 80 )
81 81 authorization_policy = ACLAuthorizationPolicy()
82 82 authentication_policy = AuthenticationStackPolicy()
83 83 authentication_policy.add_policy('auth_tkt', auth_tkt_policy)
84 84 authentication_policy.add_policy('auth_token', auth_token_policy)
85 85 # set crypto key
86 86 encryption.ENCRYPTION_SECRET = settings.get('encryption_secret')
87 87 # import this later so encyption key can be monkeypatched
88 88 from appenlight.models import DBSession, register_datastores
89 89
90 90 # registration
91 91 settings['appenlight.disable_registration'] = asbool(
92 92 settings.get('appenlight.disable_registration'))
93 93
94 94 # update config with cometd info
95 95 settings['cometd_servers'] = {'server': settings['cometd.server'],
96 96 'secret': settings['cometd.secret']}
97 97
98 98 # Create the Pyramid Configurator.
99 99 settings['_mail_url'] = settings['mailing.app_url']
100 100 config = CythonCompatConfigurator(
101 101 settings=settings,
102 102 authentication_policy=authentication_policy,
103 103 authorization_policy=authorization_policy,
104 104 root_factory='appenlight.security.RootFactory',
105 105 default_permission='view')
106 106 # custom registry variables
107 107
108 108 # resource type information
109 109 config.registry.resource_types = ['resource', 'application']
110 110 # plugin information
111 111 config.registry.appenlight_plugins = {}
112 112
113 113 config.set_default_csrf_options(require_csrf=True, header='X-XSRF-TOKEN')
114 114 config.add_view_deriver('appenlight.predicates.csrf_view',
115 115 name='csrf_view')
116 116
117 117 # later, when config is available
118 118 dogpile_config = {'url': settings['redis.url'],
119 119 "redis_expiration_time": 86400,
120 120 "redis_distributed_lock": True}
121 121 cache_regions.regions = cache_regions.CacheRegions(dogpile_config)
122 122 config.registry.cache_regions = cache_regions.regions
123 123 engine = engine_from_config(settings, 'sqlalchemy.',
124 124 json_serializer=json.dumps)
125 125 DBSession.configure(bind=engine)
126 126
127 127 # json rederer that serializes datetime
128 128 config.add_renderer('json', json_renderer)
129 129 config.add_request_method('appenlight.lib.request.es_conn', 'es_conn', property=True)
130 130 config.add_request_method('appenlight.lib.request.get_user', 'user',
131 131 reify=True, property=True)
132 132 config.add_request_method('appenlight.lib.request.get_csrf_token',
133 133 'csrf_token', reify=True, property=True)
134 134 config.add_request_method('appenlight.lib.request.safe_json_body',
135 135 'safe_json_body', reify=True, property=True)
136 136 config.add_request_method('appenlight.lib.request.unsafe_json_body',
137 137 'unsafe_json_body', reify=True, property=True)
138 138 config.add_request_method('appenlight.lib.request.add_flash_to_headers',
139 139 'add_flash_to_headers')
140 140 config.add_request_method('appenlight.lib.request.get_authomatic',
141 141 'authomatic', reify=True)
142 142
143 143 config.include('pyramid_redis_sessions')
144 144 config.include('pyramid_tm')
145 145 config.include('pyramid_jinja2')
146 146 config.include('pyramid_mailer')
147 147 config.include('appenlight_client.ext.pyramid_tween')
148 148 config.include('ziggurat_foundations.ext.pyramid.sign_in')
149 149 es_server_list = aslist(settings['elasticsearch.nodes'])
150 150 redis_url = settings['redis.url']
151 151 log.warning('Elasticsearch server list: {}'.format(es_server_list))
152 152 log.warning('Redis server: {}'.format(redis_url))
153 config.registry.es_conn = pyelasticsearch.ElasticSearch(es_server_list)
153 config.registry.es_conn = Elasticsearch(es_server_list)
154 154 config.registry.redis_conn = redis.StrictRedis.from_url(redis_url)
155 155
156 156 config.registry.redis_lockmgr = Redlock([settings['redis.redlock.url'], ],
157 157 retry_count=0, retry_delay=0)
158 158 # mailer bw compat
159 159 config.registry.mailer = config.registry.getUtility(IMailer)
160 160
161 161 # Configure sessions
162 162 session_factory = session_factory_from_settings(settings)
163 163 config.set_session_factory(session_factory)
164 164
165 165 # Configure renderers and event subscribers
166 166 config.add_jinja2_extension('jinja2.ext.loopcontrols')
167 167 config.add_jinja2_search_path('appenlight:templates')
168 168 # event subscribers
169 169 config.add_subscriber("appenlight.subscribers.application_created",
170 170 "pyramid.events.ApplicationCreated")
171 171 config.add_subscriber("appenlight.subscribers.add_renderer_globals",
172 172 "pyramid.events.BeforeRender")
173 173 config.add_subscriber('appenlight.subscribers.new_request',
174 174 'pyramid.events.NewRequest')
175 175 config.add_view_predicate('context_type_class',
176 176 'appenlight.predicates.contextTypeClass')
177 177
178 178 register_datastores(es_conn=config.registry.es_conn,
179 179 redis_conn=config.registry.redis_conn,
180 180 redis_lockmgr=config.registry.redis_lockmgr)
181 181
182 182 # base stuff and scan
183 183
184 184 # need to ensure webassets exists otherwise config.override_asset()
185 185 # throws exception
186 186 if not os.path.exists(settings['webassets.dir']):
187 187 os.mkdir(settings['webassets.dir'])
188 188 config.add_static_view(path='appenlight:webassets',
189 189 name='static', cache_max_age=3600)
190 190 config.override_asset(to_override='appenlight:webassets/',
191 191 override_with=settings['webassets.dir'])
192 192
193 193 config.include('appenlight.views')
194 194 config.include('appenlight.views.admin')
195 195 config.scan(ignore=['appenlight.migrations', 'appenlight.scripts',
196 196 'appenlight.tests'])
197 197
198 198 config.add_directive('register_appenlight_plugin',
199 199 register_appenlight_plugin)
200 200
201 201 for entry_point in iter_entry_points(group='appenlight.plugins'):
202 202 plugin = entry_point.load()
203 203 plugin.includeme(config)
204 204
205 205 # include other appenlight plugins explictly if needed
206 206 includes = aslist(settings.get('appenlight.includes', []))
207 207 for inc in includes:
208 208 config.include(inc)
209 209
210 210 # run this after everything registers in configurator
211 211
212 212 def pre_commit():
213 213 jinja_env = config.get_jinja2_environment()
214 214 jinja_env.filters['tojson'] = json.dumps
215 215 jinja_env.filters['toJSONUnsafe'] = jinja2_filters.toJSONUnsafe
216 216
217 217 config.action(None, pre_commit, order=PHASE3_CONFIG + 999)
218 218
219 219 def wrap_config_celery():
220 220 configure_celery(config.registry)
221 221
222 222 config.action(None, wrap_config_celery, order=PHASE3_CONFIG + 999)
223 223
224 224 app = config.make_wsgi_app()
225 225 return app
@@ -1,672 +1,657 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors
4 4 #
5 5 # Licensed under the Apache License, Version 2.0 (the "License");
6 6 # you may not use this file except in compliance with the License.
7 7 # You may obtain a copy of the License at
8 8 #
9 9 # http://www.apache.org/licenses/LICENSE-2.0
10 10 #
11 11 # Unless required by applicable law or agreed to in writing, software
12 12 # distributed under the License is distributed on an "AS IS" BASIS,
13 13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 14 # See the License for the specific language governing permissions and
15 15 # limitations under the License.
16 16
17 17 import bisect
18 18 import collections
19 19 import math
20 20 from datetime import datetime, timedelta
21 21
22 22 import sqlalchemy as sa
23 import pyelasticsearch
23 import elasticsearch.exceptions
24 import elasticsearch.helpers
24 25
25 26 from celery.utils.log import get_task_logger
26 27 from zope.sqlalchemy import mark_changed
27 28 from pyramid.threadlocal import get_current_request, get_current_registry
28 29 from ziggurat_foundations.models.services.resource import ResourceService
29 30
30 31 from appenlight.celery import celery
31 32 from appenlight.models.report_group import ReportGroup
32 33 from appenlight.models import DBSession, Datastores
33 34 from appenlight.models.report import Report
34 35 from appenlight.models.log import Log
35 36 from appenlight.models.metric import Metric
36 37 from appenlight.models.event import Event
37 38
38 39 from appenlight.models.services.application import ApplicationService
39 40 from appenlight.models.services.event import EventService
40 41 from appenlight.models.services.log import LogService
41 42 from appenlight.models.services.report import ReportService
42 43 from appenlight.models.services.report_group import ReportGroupService
43 44 from appenlight.models.services.user import UserService
44 45 from appenlight.models.tag import Tag
45 46 from appenlight.lib import print_traceback
46 47 from appenlight.lib.utils import parse_proto, in_batches
47 48 from appenlight.lib.ext_json import json
48 49 from appenlight.lib.redis_keys import REDIS_KEYS
49 50 from appenlight.lib.enums import ReportType
50 51
51 52 log = get_task_logger(__name__)
52 53
53 54 sample_boundries = list(range(100, 1000, 100)) + \
54 55 list(range(1000, 10000, 1000)) + \
55 56 list(range(10000, 100000, 5000))
56 57
57 58
58 59 def pick_sample(total_occurences, report_type=None):
59 60 every = 1.0
60 61 position = bisect.bisect_left(sample_boundries, total_occurences)
61 62 if position > 0:
62 63 if report_type == ReportType.not_found:
63 64 divide = 10.0
64 65 else:
65 66 divide = 100.0
66 67 every = sample_boundries[position - 1] / divide
67 68 return total_occurences % every == 0
68 69
69 70
70 71 @celery.task(queue="default", default_retry_delay=1, max_retries=2)
71 72 def test_exception_task():
72 73 log.error('test celery log', extra={'location': 'celery'})
73 74 log.warning('test celery log', extra={'location': 'celery'})
74 75 raise Exception('Celery exception test')
75 76
76 77
77 78 @celery.task(queue="default", default_retry_delay=1, max_retries=2)
78 79 def test_retry_exception_task():
79 80 try:
80 81 import time
81 82
82 83 time.sleep(1.3)
83 84 log.error('test retry celery log', extra={'location': 'celery'})
84 85 log.warning('test retry celery log', extra={'location': 'celery'})
85 86 raise Exception('Celery exception test')
86 87 except Exception as exc:
87 88 if celery.conf["CELERY_EAGER_PROPAGATES_EXCEPTIONS"]:
88 89 raise
89 90 test_retry_exception_task.retry(exc=exc)
90 91
91 92
92 93 @celery.task(queue="reports", default_retry_delay=600, max_retries=144)
93 94 def add_reports(resource_id, request_params, dataset, **kwargs):
94 95 proto_version = parse_proto(request_params.get('protocol_version', ''))
95 96 current_time = datetime.utcnow().replace(second=0, microsecond=0)
96 97 try:
97 98 # we will store solr docs here for single insert
98 99 es_report_docs = {}
99 100 es_report_group_docs = {}
100 101 resource = ApplicationService.by_id(resource_id)
101 102
102 103 tags = []
103 104 es_slow_calls_docs = {}
104 105 es_reports_stats_rows = {}
105 106 for report_data in dataset:
106 107 # build report details for later
107 108 added_details = 0
108 109 report = Report()
109 110 report.set_data(report_data, resource, proto_version)
110 111 report._skip_ft_index = True
111 112
112 113 # find latest group in this months partition
113 114 report_group = ReportGroupService.by_hash_and_resource(
114 115 report.resource_id,
115 116 report.grouping_hash,
116 117 since_when=datetime.utcnow().date().replace(day=1)
117 118 )
118 119 occurences = report_data.get('occurences', 1)
119 120 if not report_group:
120 121 # total reports will be +1 moment later
121 122 report_group = ReportGroup(grouping_hash=report.grouping_hash,
122 123 occurences=0, total_reports=0,
123 124 last_report=0,
124 125 priority=report.priority,
125 126 error=report.error,
126 127 first_timestamp=report.start_time)
127 128 report_group._skip_ft_index = True
128 129 report_group.report_type = report.report_type
129 130 report.report_group_time = report_group.first_timestamp
130 131 add_sample = pick_sample(report_group.occurences,
131 132 report_type=report_group.report_type)
132 133 if add_sample:
133 134 resource.report_groups.append(report_group)
134 135 report_group.reports.append(report)
135 136 added_details += 1
136 137 DBSession.flush()
137 138 if report.partition_id not in es_report_docs:
138 139 es_report_docs[report.partition_id] = []
139 140 es_report_docs[report.partition_id].append(report.es_doc())
140 141 tags.extend(list(report.tags.items()))
141 142 slow_calls = report.add_slow_calls(report_data, report_group)
142 143 DBSession.flush()
143 144 for s_call in slow_calls:
144 145 if s_call.partition_id not in es_slow_calls_docs:
145 146 es_slow_calls_docs[s_call.partition_id] = []
146 147 es_slow_calls_docs[s_call.partition_id].append(
147 148 s_call.es_doc())
148 149 # try generating new stat rows if needed
149 150 else:
150 151 # required for postprocessing to not fail later
151 152 report.report_group = report_group
152 153
153 154 stat_row = ReportService.generate_stat_rows(
154 155 report, resource, report_group)
155 156 if stat_row.partition_id not in es_reports_stats_rows:
156 157 es_reports_stats_rows[stat_row.partition_id] = []
157 158 es_reports_stats_rows[stat_row.partition_id].append(
158 159 stat_row.es_doc())
159 160
160 161 # see if we should mark 10th occurence of report
161 162 last_occurences_10 = int(math.floor(report_group.occurences / 10))
162 163 curr_occurences_10 = int(math.floor(
163 164 (report_group.occurences + report.occurences) / 10))
164 165 last_occurences_100 = int(
165 166 math.floor(report_group.occurences / 100))
166 167 curr_occurences_100 = int(math.floor(
167 168 (report_group.occurences + report.occurences) / 100))
168 169 notify_occurences_10 = last_occurences_10 != curr_occurences_10
169 170 notify_occurences_100 = last_occurences_100 != curr_occurences_100
170 171 report_group.occurences = ReportGroup.occurences + occurences
171 172 report_group.last_timestamp = report.start_time
172 173 report_group.summed_duration = ReportGroup.summed_duration + report.duration
173 174 summed_duration = ReportGroup.summed_duration + report.duration
174 175 summed_occurences = ReportGroup.occurences + occurences
175 176 report_group.average_duration = summed_duration / summed_occurences
176 177 report_group.run_postprocessing(report)
177 178 if added_details:
178 179 report_group.total_reports = ReportGroup.total_reports + 1
179 180 report_group.last_report = report.id
180 181 report_group.set_notification_info(notify_10=notify_occurences_10,
181 182 notify_100=notify_occurences_100)
182 183 DBSession.flush()
183 184 report_group.get_report().notify_channel(report_group)
184 185 if report_group.partition_id not in es_report_group_docs:
185 186 es_report_group_docs[report_group.partition_id] = []
186 187 es_report_group_docs[report_group.partition_id].append(
187 188 report_group.es_doc())
188 189
189 190 action = 'REPORT'
190 191 log_msg = '%s: %s %s, client: %s, proto: %s' % (
191 192 action,
192 193 report_data.get('http_status', 'unknown'),
193 194 str(resource),
194 195 report_data.get('client'),
195 196 proto_version)
196 197 log.info(log_msg)
197 198 total_reports = len(dataset)
198 199 redis_pipeline = Datastores.redis.pipeline(transaction=False)
199 200 key = REDIS_KEYS['counters']['reports_per_minute'].format(current_time)
200 201 redis_pipeline.incr(key, total_reports)
201 202 redis_pipeline.expire(key, 3600 * 24)
202 203 key = REDIS_KEYS['counters']['events_per_minute_per_user'].format(
203 204 resource.owner_user_id, current_time)
204 205 redis_pipeline.incr(key, total_reports)
205 206 redis_pipeline.expire(key, 3600)
206 207 key = REDIS_KEYS['counters']['reports_per_hour_per_app'].format(
207 208 resource_id, current_time.replace(minute=0))
208 209 redis_pipeline.incr(key, total_reports)
209 210 redis_pipeline.expire(key, 3600 * 24 * 7)
210 211 redis_pipeline.sadd(
211 212 REDIS_KEYS['apps_that_got_new_data_per_hour'].format(
212 213 current_time.replace(minute=0)), resource_id)
213 214 redis_pipeline.execute()
214 215
215 216 add_reports_es(es_report_group_docs, es_report_docs)
216 217 add_reports_slow_calls_es(es_slow_calls_docs)
217 218 add_reports_stats_rows_es(es_reports_stats_rows)
218 219 return True
219 220 except Exception as exc:
220 221 print_traceback(log)
221 222 if celery.conf["CELERY_EAGER_PROPAGATES_EXCEPTIONS"]:
222 223 raise
223 224 add_reports.retry(exc=exc)
224 225
225 226
226 227 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
227 228 def add_reports_es(report_group_docs, report_docs):
228 229 for k, v in report_group_docs.items():
229 Datastores.es.bulk_index(k, 'report_group', v, id_field="_id")
230 to_update = {'_index': k, '_type': 'report_group'}
231 [i.update(to_update) for i in v]
232 elasticsearch.helpers.bulk(Datastores.es, v)
230 233 for k, v in report_docs.items():
231 Datastores.es.bulk_index(k, 'report', v, id_field="_id",
232 parent_field='_parent')
234 to_update = {'_index': k, '_type': 'report'}
235 [i.update(to_update) for i in v]
236 elasticsearch.helpers.bulk(Datastores.es, v)
233 237
234 238
235 239 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
236 240 def add_reports_slow_calls_es(es_docs):
237 241 for k, v in es_docs.items():
238 Datastores.es.bulk_index(k, 'log', v)
242 to_update = {'_index': k, '_type': 'log'}
243 [i.update(to_update) for i in v]
244 elasticsearch.helpers.bulk(Datastores.es, v)
239 245
240 246
241 247 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
242 248 def add_reports_stats_rows_es(es_docs):
243 249 for k, v in es_docs.items():
244 Datastores.es.bulk_index(k, 'log', v)
250 to_update = {'_index': k, '_type': 'log'}
251 [i.update(to_update) for i in v]
252 elasticsearch.helpers.bulk(Datastores.es, v)
245 253
246 254
247 255 @celery.task(queue="logs", default_retry_delay=600, max_retries=144)
248 256 def add_logs(resource_id, request_params, dataset, **kwargs):
249 257 proto_version = request_params.get('protocol_version')
250 258 current_time = datetime.utcnow().replace(second=0, microsecond=0)
251 259
252 260 try:
253 261 es_docs = collections.defaultdict(list)
254 262 resource = ApplicationService.by_id_cached()(resource_id)
255 263 resource = DBSession.merge(resource, load=False)
256 264 ns_pairs = []
257 265 for entry in dataset:
258 266 # gather pk and ns so we can remove older versions of row later
259 267 if entry['primary_key'] is not None:
260 268 ns_pairs.append({"pk": entry['primary_key'],
261 269 "ns": entry['namespace']})
262 270 log_entry = Log()
263 271 log_entry.set_data(entry, resource=resource)
264 272 log_entry._skip_ft_index = True
265 273 resource.logs.append(log_entry)
266 274 DBSession.flush()
267 275 # insert non pk rows first
268 276 if entry['primary_key'] is None:
269 277 es_docs[log_entry.partition_id].append(log_entry.es_doc())
270 278
271 279 # 2nd pass to delete all log entries from db foe same pk/ns pair
272 280 if ns_pairs:
273 281 ids_to_delete = []
274 282 es_docs = collections.defaultdict(list)
275 283 es_docs_to_delete = collections.defaultdict(list)
276 284 found_pkey_logs = LogService.query_by_primary_key_and_namespace(
277 285 list_of_pairs=ns_pairs)
278 286 log_dict = {}
279 287 for log_entry in found_pkey_logs:
280 288 log_key = (log_entry.primary_key, log_entry.namespace)
281 289 if log_key not in log_dict:
282 290 log_dict[log_key] = []
283 291 log_dict[log_key].append(log_entry)
284 292
285 293 for ns, entry_list in log_dict.items():
286 294 entry_list = sorted(entry_list, key=lambda x: x.timestamp)
287 295 # newest row needs to be indexed in es
288 296 log_entry = entry_list[-1]
289 297 # delete everything from pg and ES, leave the last row in pg
290 298 for e in entry_list[:-1]:
291 299 ids_to_delete.append(e.log_id)
292 300 es_docs_to_delete[e.partition_id].append(e.delete_hash)
293 301
294 302 es_docs_to_delete[log_entry.partition_id].append(
295 303 log_entry.delete_hash)
296 304
297 305 es_docs[log_entry.partition_id].append(log_entry.es_doc())
298 306
299 307 if ids_to_delete:
300 308 query = DBSession.query(Log).filter(
301 309 Log.log_id.in_(ids_to_delete))
302 310 query.delete(synchronize_session=False)
303 311 if es_docs_to_delete:
304 312 # batch this to avoid problems with default ES bulk limits
305 313 for es_index in es_docs_to_delete.keys():
306 314 for batch in in_batches(es_docs_to_delete[es_index], 20):
307 query = {'terms': {'delete_hash': batch}}
315 query = {"query": {'terms': {'delete_hash': batch}}}
308 316
309 317 try:
310 Datastores.es.delete_by_query(
311 es_index, 'log', query)
312 except pyelasticsearch.ElasticHttpNotFoundError as exc:
318 Datastores.es.transport.perform_request(
319 "DELETE", '/{}/{}/_query'.format(es_index, 'log'), body=query)
320 except elasticsearch.exceptions.NotFoundError as exc:
313 321 msg = 'skipping index {}'.format(es_index)
314 322 log.info(msg)
315 323
316 324 total_logs = len(dataset)
317 325
318 326 log_msg = 'LOG_NEW: %s, entries: %s, proto:%s' % (
319 327 str(resource),
320 328 total_logs,
321 329 proto_version)
322 330 log.info(log_msg)
323 331 # mark_changed(session)
324 332 redis_pipeline = Datastores.redis.pipeline(transaction=False)
325 333 key = REDIS_KEYS['counters']['logs_per_minute'].format(current_time)
326 334 redis_pipeline.incr(key, total_logs)
327 335 redis_pipeline.expire(key, 3600 * 24)
328 336 key = REDIS_KEYS['counters']['events_per_minute_per_user'].format(
329 337 resource.owner_user_id, current_time)
330 338 redis_pipeline.incr(key, total_logs)
331 339 redis_pipeline.expire(key, 3600)
332 340 key = REDIS_KEYS['counters']['logs_per_hour_per_app'].format(
333 341 resource_id, current_time.replace(minute=0))
334 342 redis_pipeline.incr(key, total_logs)
335 343 redis_pipeline.expire(key, 3600 * 24 * 7)
336 344 redis_pipeline.sadd(
337 345 REDIS_KEYS['apps_that_got_new_data_per_hour'].format(
338 346 current_time.replace(minute=0)), resource_id)
339 347 redis_pipeline.execute()
340 348 add_logs_es(es_docs)
341 349 return True
342 350 except Exception as exc:
343 351 print_traceback(log)
344 352 if celery.conf["CELERY_EAGER_PROPAGATES_EXCEPTIONS"]:
345 353 raise
346 354 add_logs.retry(exc=exc)
347 355
348 356
349 357 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
350 358 def add_logs_es(es_docs):
351 359 for k, v in es_docs.items():
352 Datastores.es.bulk_index(k, 'log', v)
360 to_update = {'_index': k, '_type': 'log'}
361 [i.update(to_update) for i in v]
362 elasticsearch.helpers.bulk(Datastores.es, v)
353 363
354 364
355 365 @celery.task(queue="metrics", default_retry_delay=600, max_retries=144)
356 366 def add_metrics(resource_id, request_params, dataset, proto_version):
357 367 current_time = datetime.utcnow().replace(second=0, microsecond=0)
358 368 try:
359 369 resource = ApplicationService.by_id_cached()(resource_id)
360 370 resource = DBSession.merge(resource, load=False)
361 371 es_docs = []
362 372 rows = []
363 373 for metric in dataset:
364 374 tags = dict(metric['tags'])
365 375 server_n = tags.get('server_name', metric['server_name']).lower()
366 376 tags['server_name'] = server_n or 'unknown'
367 377 new_metric = Metric(
368 378 timestamp=metric['timestamp'],
369 379 resource_id=resource.resource_id,
370 380 namespace=metric['namespace'],
371 381 tags=tags)
372 382 rows.append(new_metric)
373 383 es_docs.append(new_metric.es_doc())
374 384 session = DBSession()
375 385 session.bulk_save_objects(rows)
376 386 session.flush()
377 387
378 388 action = 'METRICS'
379 389 metrics_msg = '%s: %s, metrics: %s, proto:%s' % (
380 390 action,
381 391 str(resource),
382 392 len(dataset),
383 393 proto_version
384 394 )
385 395 log.info(metrics_msg)
386 396
387 397 mark_changed(session)
388 398 redis_pipeline = Datastores.redis.pipeline(transaction=False)
389 399 key = REDIS_KEYS['counters']['metrics_per_minute'].format(current_time)
390 400 redis_pipeline.incr(key, len(rows))
391 401 redis_pipeline.expire(key, 3600 * 24)
392 402 key = REDIS_KEYS['counters']['events_per_minute_per_user'].format(
393 403 resource.owner_user_id, current_time)
394 404 redis_pipeline.incr(key, len(rows))
395 405 redis_pipeline.expire(key, 3600)
396 406 key = REDIS_KEYS['counters']['metrics_per_hour_per_app'].format(
397 407 resource_id, current_time.replace(minute=0))
398 408 redis_pipeline.incr(key, len(rows))
399 409 redis_pipeline.expire(key, 3600 * 24 * 7)
400 410 redis_pipeline.sadd(
401 411 REDIS_KEYS['apps_that_got_new_data_per_hour'].format(
402 412 current_time.replace(minute=0)), resource_id)
403 413 redis_pipeline.execute()
404 414 add_metrics_es(es_docs)
405 415 return True
406 416 except Exception as exc:
407 417 print_traceback(log)
408 418 if celery.conf["CELERY_EAGER_PROPAGATES_EXCEPTIONS"]:
409 419 raise
410 420 add_metrics.retry(exc=exc)
411 421
412 422
413 423 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
414 424 def add_metrics_es(es_docs):
415 425 for doc in es_docs:
416 426 partition = 'rcae_m_%s' % doc['timestamp'].strftime('%Y_%m_%d')
417 427 Datastores.es.index(partition, 'log', doc)
418 428
419 429
420 430 @celery.task(queue="default", default_retry_delay=5, max_retries=2)
421 431 def check_user_report_notifications(resource_id):
422 432 since_when = datetime.utcnow()
423 433 try:
424 434 request = get_current_request()
425 435 application = ApplicationService.by_id(resource_id)
426 436 if not application:
427 437 return
428 438 error_key = REDIS_KEYS['reports_to_notify_per_type_per_app'].format(
429 439 ReportType.error, resource_id)
430 440 slow_key = REDIS_KEYS['reports_to_notify_per_type_per_app'].format(
431 441 ReportType.slow, resource_id)
432 442 error_group_ids = Datastores.redis.smembers(error_key)
433 443 slow_group_ids = Datastores.redis.smembers(slow_key)
434 444 Datastores.redis.delete(error_key)
435 445 Datastores.redis.delete(slow_key)
436 446 err_gids = [int(g_id) for g_id in error_group_ids]
437 447 slow_gids = [int(g_id) for g_id in list(slow_group_ids)]
438 448 group_ids = err_gids + slow_gids
439 449 occurence_dict = {}
440 450 for g_id in group_ids:
441 451 key = REDIS_KEYS['counters']['report_group_occurences'].format(
442 452 g_id)
443 453 val = Datastores.redis.get(key)
444 454 Datastores.redis.delete(key)
445 455 if val:
446 456 occurence_dict[g_id] = int(val)
447 457 else:
448 458 occurence_dict[g_id] = 1
449 459 report_groups = ReportGroupService.by_ids(group_ids)
450 460 report_groups.options(sa.orm.joinedload(ReportGroup.last_report_ref))
451 461
452 462 ApplicationService.check_for_groups_alert(
453 463 application, 'alert', report_groups=report_groups,
454 464 occurence_dict=occurence_dict)
455 465 users = set([p.user for p in ResourceService.users_for_perm(application, 'view')])
456 466 report_groups = report_groups.all()
457 467 for user in users:
458 468 UserService.report_notify(user, request, application,
459 469 report_groups=report_groups,
460 470 occurence_dict=occurence_dict)
461 471 for group in report_groups:
462 472 # marks report_groups as notified
463 473 if not group.notified:
464 474 group.notified = True
465 475 except Exception as exc:
466 476 print_traceback(log)
467 477 raise
468 478
469 479
470 480 @celery.task(queue="default", default_retry_delay=5, max_retries=2)
471 481 def check_alerts(resource_id):
472 482 since_when = datetime.utcnow()
473 483 try:
474 484 request = get_current_request()
475 485 application = ApplicationService.by_id(resource_id)
476 486 if not application:
477 487 return
478 488 error_key = REDIS_KEYS[
479 489 'reports_to_notify_per_type_per_app_alerting'].format(
480 490 ReportType.error, resource_id)
481 491 slow_key = REDIS_KEYS[
482 492 'reports_to_notify_per_type_per_app_alerting'].format(
483 493 ReportType.slow, resource_id)
484 494 error_group_ids = Datastores.redis.smembers(error_key)
485 495 slow_group_ids = Datastores.redis.smembers(slow_key)
486 496 Datastores.redis.delete(error_key)
487 497 Datastores.redis.delete(slow_key)
488 498 err_gids = [int(g_id) for g_id in error_group_ids]
489 499 slow_gids = [int(g_id) for g_id in list(slow_group_ids)]
490 500 group_ids = err_gids + slow_gids
491 501 occurence_dict = {}
492 502 for g_id in group_ids:
493 503 key = REDIS_KEYS['counters'][
494 504 'report_group_occurences_alerting'].format(
495 505 g_id)
496 506 val = Datastores.redis.get(key)
497 507 Datastores.redis.delete(key)
498 508 if val:
499 509 occurence_dict[g_id] = int(val)
500 510 else:
501 511 occurence_dict[g_id] = 1
502 512 report_groups = ReportGroupService.by_ids(group_ids)
503 513 report_groups.options(sa.orm.joinedload(ReportGroup.last_report_ref))
504 514
505 515 ApplicationService.check_for_groups_alert(
506 516 application, 'alert', report_groups=report_groups,
507 517 occurence_dict=occurence_dict, since_when=since_when)
508 518 except Exception as exc:
509 519 print_traceback(log)
510 520 raise
511 521
512 522
513 523 @celery.task(queue="default", default_retry_delay=1, max_retries=2)
514 524 def close_alerts():
515 525 log.warning('Checking alerts')
516 526 since_when = datetime.utcnow()
517 527 try:
518 528 event_types = [Event.types['error_report_alert'],
519 529 Event.types['slow_report_alert'], ]
520 530 statuses = [Event.statuses['active']]
521 531 # get events older than 5 min
522 532 events = EventService.by_type_and_status(
523 533 event_types,
524 534 statuses,
525 535 older_than=(since_when - timedelta(minutes=5)))
526 536 for event in events:
527 537 # see if we can close them
528 538 event.validate_or_close(
529 539 since_when=(since_when - timedelta(minutes=1)))
530 540 except Exception as exc:
531 541 print_traceback(log)
532 542 raise
533 543
534 544
535 545 @celery.task(queue="default", default_retry_delay=600, max_retries=144)
536 546 def update_tag_counter(tag_name, tag_value, count):
537 547 try:
538 548 query = DBSession.query(Tag).filter(Tag.name == tag_name).filter(
539 549 sa.cast(Tag.value, sa.types.TEXT) == sa.cast(json.dumps(tag_value),
540 550 sa.types.TEXT))
541 551 query.update({'times_seen': Tag.times_seen + count,
542 552 'last_timestamp': datetime.utcnow()},
543 553 synchronize_session=False)
544 554 session = DBSession()
545 555 mark_changed(session)
546 556 return True
547 557 except Exception as exc:
548 558 print_traceback(log)
549 559 if celery.conf["CELERY_EAGER_PROPAGATES_EXCEPTIONS"]:
550 560 raise
551 561 update_tag_counter.retry(exc=exc)
552 562
553 563
554 564 @celery.task(queue="default")
555 565 def update_tag_counters():
556 566 """
557 567 Sets task to update counters for application tags
558 568 """
559 569 tags = Datastores.redis.lrange(REDIS_KEYS['seen_tag_list'], 0, -1)
560 570 Datastores.redis.delete(REDIS_KEYS['seen_tag_list'])
561 571 c = collections.Counter(tags)
562 572 for t_json, count in c.items():
563 573 tag_info = json.loads(t_json)
564 574 update_tag_counter.delay(tag_info[0], tag_info[1], count)
565 575
566 576
567 577 @celery.task(queue="default")
568 578 def daily_digest():
569 579 """
570 580 Sends daily digest with top 50 error reports
571 581 """
572 582 request = get_current_request()
573 583 apps = Datastores.redis.smembers(REDIS_KEYS['apps_that_had_reports'])
574 584 Datastores.redis.delete(REDIS_KEYS['apps_that_had_reports'])
575 585 since_when = datetime.utcnow() - timedelta(hours=8)
576 586 log.warning('Generating daily digests')
577 587 for resource_id in apps:
578 588 resource_id = resource_id.decode('utf8')
579 589 end_date = datetime.utcnow().replace(microsecond=0, second=0)
580 590 filter_settings = {'resource': [resource_id],
581 591 'tags': [{'name': 'type',
582 592 'value': ['error'], 'op': None}],
583 593 'type': 'error', 'start_date': since_when,
584 594 'end_date': end_date}
585 595
586 596 reports = ReportGroupService.get_trending(
587 597 request, filter_settings=filter_settings, limit=50)
588 598
589 599 application = ApplicationService.by_id(resource_id)
590 600 if application:
591 601 users = set([p.user for p in ResourceService.users_for_perm(application, 'view')])
592 602 for user in users:
593 603 user.send_digest(request, application, reports=reports,
594 604 since_when=since_when)
595 605
596 606
597 607 @celery.task(queue="default")
598 608 def notifications_reports():
599 609 """
600 610 Loop that checks redis for info and then issues new tasks to celery to
601 611 issue notifications
602 612 """
603 613 apps = Datastores.redis.smembers(REDIS_KEYS['apps_that_had_reports'])
604 614 Datastores.redis.delete(REDIS_KEYS['apps_that_had_reports'])
605 615 for app in apps:
606 616 log.warning('Notify for app: %s' % app)
607 617 check_user_report_notifications.delay(app.decode('utf8'))
608 618
609 619 @celery.task(queue="default")
610 620 def alerting_reports():
611 621 """
612 622 Loop that checks redis for info and then issues new tasks to celery to
613 623 perform the following:
614 624 - which applications should have new alerts opened
615 625 """
616 626
617 627 apps = Datastores.redis.smembers(REDIS_KEYS['apps_that_had_reports_alerting'])
618 628 Datastores.redis.delete(REDIS_KEYS['apps_that_had_reports_alerting'])
619 629 for app in apps:
620 630 log.warning('Notify for app: %s' % app)
621 631 check_alerts.delay(app.decode('utf8'))
622 632
623 633
624 634 @celery.task(queue="default", soft_time_limit=3600 * 4,
625 635 hard_time_limit=3600 * 4, max_retries=144)
626 636 def logs_cleanup(resource_id, filter_settings):
627 637 request = get_current_request()
628 638 request.tm.begin()
629 639 es_query = {
630 "_source": False,
631 "size": 5000,
632 640 "query": {
633 641 "filtered": {
634 642 "filter": {
635 643 "and": [{"term": {"resource_id": resource_id}}]
636 644 }
637 645 }
638 646 }
639 647 }
640 648
641 649 query = DBSession.query(Log).filter(Log.resource_id == resource_id)
642 650 if filter_settings['namespace']:
643 651 query = query.filter(Log.namespace == filter_settings['namespace'][0])
644 652 es_query['query']['filtered']['filter']['and'].append(
645 653 {"term": {"namespace": filter_settings['namespace'][0]}}
646 654 )
647 655 query.delete(synchronize_session=False)
648 656 request.tm.commit()
649 result = request.es_conn.search(es_query, index='rcae_l_*',
650 doc_type='log', es_scroll='1m',
651 es_search_type='scan')
652 scroll_id = result['_scroll_id']
653 while True:
654 log.warning('log_cleanup, app:{} ns:{} batch'.format(
655 resource_id,
656 filter_settings['namespace']
657 ))
658 es_docs_to_delete = []
659 result = request.es_conn.send_request(
660 'POST', ['_search', 'scroll'],
661 body=scroll_id, query_params={"scroll": '1m'})
662 scroll_id = result['_scroll_id']
663 if not result['hits']['hits']:
664 break
665 for doc in result['hits']['hits']:
666 es_docs_to_delete.append({"id": doc['_id'],
667 "index": doc['_index']})
668
669 for batch in in_batches(es_docs_to_delete, 10):
670 Datastores.es.bulk([Datastores.es.delete_op(doc_type='log',
671 **to_del)
672 for to_del in batch])
657 Datastores.es.transport.perform_request("DELETE", '/{}/{}/_query'.format('rcae_l_*', 'log'), body=es_query)
@@ -1,81 +1,80 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors
4 4 #
5 5 # Licensed under the Apache License, Version 2.0 (the "License");
6 6 # you may not use this file except in compliance with the License.
7 7 # You may obtain a copy of the License at
8 8 #
9 9 # http://www.apache.org/licenses/LICENSE-2.0
10 10 #
11 11 # Unless required by applicable law or agreed to in writing, software
12 12 # distributed under the License is distributed on an "AS IS" BASIS,
13 13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 14 # See the License for the specific language governing permissions and
15 15 # limitations under the License.
16 16
17 17 import datetime
18 18 import logging
19 19
20 20 from pyramid.httpexceptions import HTTPForbidden, HTTPTooManyRequests
21 21
22 from appenlight.models import Datastores
23 22 from appenlight.models.services.config import ConfigService
24 23 from appenlight.lib.redis_keys import REDIS_KEYS
25 24
26 25 log = logging.getLogger(__name__)
27 26
28 27
29 28 def rate_limiting(request, resource, section, to_increment=1):
30 29 tsample = datetime.datetime.utcnow().replace(second=0, microsecond=0)
31 30 key = REDIS_KEYS['rate_limits'][section].format(tsample,
32 31 resource.resource_id)
33 32 redis_pipeline = request.registry.redis_conn.pipeline()
34 33 redis_pipeline.incr(key, to_increment)
35 34 redis_pipeline.expire(key, 3600 * 24)
36 35 results = redis_pipeline.execute()
37 36 current_count = results[0]
38 37 config = ConfigService.by_key_and_section(section, 'global')
39 38 limit = config.value if config else 1000
40 39 if current_count > int(limit):
41 40 log.info('RATE LIMITING: {}: {}, {}'.format(
42 41 section, resource, current_count))
43 42 abort_msg = 'Rate limits are in effect for this application'
44 43 raise HTTPTooManyRequests(abort_msg,
45 44 headers={'X-AppEnlight': abort_msg})
46 45
47 46
48 47 def check_cors(request, application, should_return=True):
49 48 """
50 49 Performs a check and validation if request comes from authorized domain for
51 50 application, otherwise return 403
52 51 """
53 52 origin_found = False
54 53 origin = request.headers.get('Origin')
55 54 if should_return:
56 55 log.info('CORS for %s' % origin)
57 56 if not origin:
58 57 return False
59 58 for domain in application.domains.split('\n'):
60 59 if domain in origin:
61 60 origin_found = True
62 61 if origin_found:
63 62 request.response.headers.add('Access-Control-Allow-Origin', origin)
64 63 request.response.headers.add('XDomainRequestAllowed', '1')
65 64 request.response.headers.add('Access-Control-Allow-Methods',
66 65 'GET, POST, OPTIONS')
67 66 request.response.headers.add('Access-Control-Allow-Headers',
68 67 'Accept-Encoding, Accept-Language, '
69 68 'Content-Type, '
70 69 'Depth, User-Agent, X-File-Size, '
71 70 'X-Requested-With, If-Modified-Since, '
72 71 'X-File-Name, '
73 72 'Cache-Control, Host, Pragma, Accept, '
74 73 'Origin, Connection, '
75 74 'Referer, Cookie, '
76 75 'X-appenlight-public-api-key, '
77 76 'x-appenlight-public-api-key')
78 77 request.response.headers.add('Access-Control-Max-Age', '86400')
79 78 return request.response
80 79 else:
81 80 return HTTPForbidden()
@@ -1,491 +1,491 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors
4 4 #
5 5 # Licensed under the Apache License, Version 2.0 (the "License");
6 6 # you may not use this file except in compliance with the License.
7 7 # You may obtain a copy of the License at
8 8 #
9 9 # http://www.apache.org/licenses/LICENSE-2.0
10 10 #
11 11 # Unless required by applicable law or agreed to in writing, software
12 12 # distributed under the License is distributed on an "AS IS" BASIS,
13 13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 14 # See the License for the specific language governing permissions and
15 15 # limitations under the License.
16 16
17 17 """
18 18 Utility functions.
19 19 """
20 20 import logging
21 21 import requests
22 22 import hashlib
23 23 import json
24 24 import copy
25 25 import uuid
26 26 import appenlight.lib.helpers as h
27 27 from collections import namedtuple
28 28 from datetime import timedelta, datetime, date
29 29 from dogpile.cache.api import NO_VALUE
30 30 from appenlight.models import Datastores
31 31 from appenlight.validators import (LogSearchSchema,
32 32 TagListSchema,
33 33 accepted_search_params)
34 34 from itsdangerous import TimestampSigner
35 35 from ziggurat_foundations.permissions import ALL_PERMISSIONS
36 36 from ziggurat_foundations.models.services.user import UserService
37 37 from dateutil.relativedelta import relativedelta
38 38 from dateutil.rrule import rrule, MONTHLY, DAILY
39 39
40 40 log = logging.getLogger(__name__)
41 41
42 42
43 43 Stat = namedtuple('Stat', 'start_interval value')
44 44
45 45
46 46 def default_extractor(item):
47 47 """
48 48 :param item - item to extract date from
49 49 """
50 50 if hasattr(item, 'start_interval'):
51 51 return item.start_interval
52 52 return item['start_interval']
53 53
54 54
55 55 # fast gap generator
56 56 def gap_gen_default(start, step, itemiterator, end_time=None,
57 57 iv_extractor=None):
58 58 """ generates a list of time/value items based on step and itemiterator
59 59 if there are entries missing from iterator time/None will be returned
60 60 instead
61 61 :param start - datetime - what time should we start generating our values
62 62 :param step - timedelta - stepsize
63 63 :param itemiterator - iterable - we will check this iterable for values
64 64 corresponding to generated steps
65 65 :param end_time - datetime - when last step is >= end_time stop iterating
66 66 :param iv_extractor - extracts current step from iterable items
67 67 """
68 68
69 69 if not iv_extractor:
70 70 iv_extractor = default_extractor
71 71
72 72 next_step = start
73 73 minutes = step.total_seconds() / 60.0
74 74 while next_step.minute % minutes != 0:
75 75 next_step = next_step.replace(minute=next_step.minute - 1)
76 76 for item in itemiterator:
77 77 item_start_interval = iv_extractor(item)
78 78 # do we have a match for current time step in our data?
79 79 # no gen a new tuple with 0 values
80 80 while next_step < item_start_interval:
81 81 yield Stat(next_step, None)
82 82 next_step = next_step + step
83 83 if next_step == item_start_interval:
84 84 yield Stat(item_start_interval, item)
85 85 next_step = next_step + step
86 86 if end_time:
87 87 while next_step < end_time:
88 88 yield Stat(next_step, None)
89 89 next_step = next_step + step
90 90
91 91
92 92 class DateTimeEncoder(json.JSONEncoder):
93 93 """ Simple datetime to ISO encoder for json serialization"""
94 94
95 95 def default(self, obj):
96 96 if isinstance(obj, date):
97 97 return obj.isoformat()
98 98 if isinstance(obj, datetime):
99 99 return obj.isoformat()
100 100 return json.JSONEncoder.default(self, obj)
101 101
102 102
103 103 def channelstream_request(secret, endpoint, payload, throw_exceptions=False,
104 104 servers=None):
105 105 responses = []
106 106 if not servers:
107 107 servers = []
108 108
109 109 signer = TimestampSigner(secret)
110 110 sig_for_server = signer.sign(endpoint)
111 111 for secret, server in [(s['secret'], s['server']) for s in servers]:
112 112 response = {}
113 113 secret_headers = {'x-channelstream-secret': sig_for_server,
114 114 'x-channelstream-endpoint': endpoint,
115 115 'Content-Type': 'application/json'}
116 116 url = '%s%s' % (server, endpoint)
117 117 try:
118 118 response = requests.post(url,
119 119 data=json.dumps(payload,
120 120 cls=DateTimeEncoder),
121 121 headers=secret_headers,
122 122 verify=False,
123 123 timeout=2).json()
124 124 except requests.exceptions.RequestException as e:
125 125 if throw_exceptions:
126 126 raise
127 127 responses.append(response)
128 128 return responses
129 129
130 130
131 131 def add_cors_headers(response):
132 132 # allow CORS
133 133 response.headers.add('Access-Control-Allow-Origin', '*')
134 134 response.headers.add('XDomainRequestAllowed', '1')
135 135 response.headers.add('Access-Control-Allow-Methods', 'GET, POST, OPTIONS')
136 136 # response.headers.add('Access-Control-Allow-Credentials', 'true')
137 137 response.headers.add('Access-Control-Allow-Headers',
138 138 'Content-Type, Depth, User-Agent, X-File-Size, X-Requested-With, If-Modified-Since, X-File-Name, Cache-Control, Pragma, Origin, Connection, Referer, Cookie')
139 139 response.headers.add('Access-Control-Max-Age', '86400')
140 140
141 141
142 142 from sqlalchemy.sql import compiler
143 143 from psycopg2.extensions import adapt as sqlescape
144 144
145 145
146 146 # or use the appropiate escape function from your db driver
147 147
148 148 def compile_query(query):
149 149 dialect = query.session.bind.dialect
150 150 statement = query.statement
151 151 comp = compiler.SQLCompiler(dialect, statement)
152 152 comp.compile()
153 153 enc = dialect.encoding
154 154 params = {}
155 155 for k, v in comp.params.items():
156 156 if isinstance(v, str):
157 157 v = v.encode(enc)
158 158 params[k] = sqlescape(v)
159 159 return (comp.string.encode(enc) % params).decode(enc)
160 160
161 161
162 162 def convert_es_type(input_data):
163 163 """
164 164 This might need to convert some text or other types to corresponding ES types
165 165 """
166 166 return str(input_data)
167 167
168 168
169 169 ProtoVersion = namedtuple('ProtoVersion', ['major', 'minor', 'patch'])
170 170
171 171
172 172 def parse_proto(input_data):
173 173 try:
174 174 parts = [int(x) for x in input_data.split('.')]
175 175 while len(parts) < 3:
176 176 parts.append(0)
177 177 return ProtoVersion(*parts)
178 178 except Exception as e:
179 179 log.info('Unknown protocol version: %s' % e)
180 180 return ProtoVersion(99, 99, 99)
181 181
182 182
183 183 def es_index_name_limiter(start_date=None, end_date=None, months_in_past=6,
184 184 ixtypes=None):
185 185 """
186 186 This function limits the search to 6 months by default so we don't have to
187 187 query 300 elasticsearch indices for 20 years of historical data for example
188 188 """
189 189
190 190 # should be cached later
191 191 def get_possible_names():
192 return list(Datastores.es.aliases().keys())
192 return list(Datastores.es.indices.get_alias('*'))
193 193
194 194 possible_names = get_possible_names()
195 195 es_index_types = []
196 196 if not ixtypes:
197 197 ixtypes = ['reports', 'metrics', 'logs']
198 198 for t in ixtypes:
199 199 if t == 'reports':
200 200 es_index_types.append('rcae_r_%s')
201 201 elif t == 'logs':
202 202 es_index_types.append('rcae_l_%s')
203 203 elif t == 'metrics':
204 204 es_index_types.append('rcae_m_%s')
205 205 elif t == 'uptime':
206 206 es_index_types.append('rcae_u_%s')
207 207 elif t == 'slow_calls':
208 208 es_index_types.append('rcae_sc_%s')
209 209
210 210 if start_date:
211 211 start_date = copy.copy(start_date)
212 212 else:
213 213 if not end_date:
214 214 end_date = datetime.utcnow()
215 215 start_date = end_date + relativedelta(months=months_in_past * -1)
216 216
217 217 if not end_date:
218 218 end_date = start_date + relativedelta(months=months_in_past)
219 219
220 220 index_dates = list(rrule(MONTHLY,
221 221 dtstart=start_date.date().replace(day=1),
222 222 until=end_date.date(),
223 223 count=36))
224 224 index_names = []
225 225 for ix_type in es_index_types:
226 226 to_extend = [ix_type % d.strftime('%Y_%m') for d in index_dates
227 227 if ix_type % d.strftime('%Y_%m') in possible_names]
228 228 index_names.extend(to_extend)
229 229 for day in list(rrule(DAILY, dtstart=start_date.date(),
230 230 until=end_date.date(), count=366)):
231 231 ix_name = ix_type % day.strftime('%Y_%m_%d')
232 232 if ix_name in possible_names:
233 233 index_names.append(ix_name)
234 234 return index_names
235 235
236 236
237 237 def build_filter_settings_from_query_dict(
238 238 request, params=None, override_app_ids=None,
239 239 resource_permissions=None):
240 240 """
241 241 Builds list of normalized search terms for ES from query params
242 242 ensuring application list is restricted to only applications user
243 243 has access to
244 244
245 245 :param params (dictionary)
246 246 :param override_app_ids - list of application id's to use instead of
247 247 applications user normally has access to
248 248 """
249 249 params = copy.deepcopy(params)
250 250 applications = []
251 251 if not resource_permissions:
252 252 resource_permissions = ['view']
253 253
254 254 if request.user:
255 255 applications = UserService.resources_with_perms(
256 256 request.user, resource_permissions, resource_types=['application'])
257 257
258 258 # CRITICAL - this ensures our resultset is limited to only the ones
259 259 # user has view permissions
260 260 all_possible_app_ids = set([app.resource_id for app in applications])
261 261
262 262 # if override is preset we force permission for app to be present
263 263 # this allows users to see dashboards and applications they would
264 264 # normally not be able to
265 265
266 266 if override_app_ids:
267 267 all_possible_app_ids = set(override_app_ids)
268 268
269 269 schema = LogSearchSchema().bind(resources=all_possible_app_ids)
270 270 tag_schema = TagListSchema()
271 271 filter_settings = schema.deserialize(params)
272 272 tag_list = []
273 273 for k, v in list(filter_settings.items()):
274 274 if k in accepted_search_params:
275 275 continue
276 276 tag_list.append({"name": k, "value": v, "op": 'eq'})
277 277 # remove the key from filter_settings
278 278 filter_settings.pop(k, None)
279 279 tags = tag_schema.deserialize(tag_list)
280 280 filter_settings['tags'] = tags
281 281 return filter_settings
282 282
283 283
284 284 def gen_uuid():
285 285 return str(uuid.uuid4())
286 286
287 287
288 288 def gen_uuid4_sha_hex():
289 289 return hashlib.sha1(uuid.uuid4().bytes).hexdigest()
290 290
291 291
292 292 def permission_tuple_to_dict(data):
293 293 out = {
294 294 "user_name": None,
295 295 "perm_name": data.perm_name,
296 296 "owner": data.owner,
297 297 "type": data.type,
298 298 "resource_name": None,
299 299 "resource_type": None,
300 300 "resource_id": None,
301 301 "group_name": None,
302 302 "group_id": None
303 303 }
304 304 if data.user:
305 305 out["user_name"] = data.user.user_name
306 306 if data.perm_name == ALL_PERMISSIONS:
307 307 out['perm_name'] = '__all_permissions__'
308 308 if data.resource:
309 309 out['resource_name'] = data.resource.resource_name
310 310 out['resource_type'] = data.resource.resource_type
311 311 out['resource_id'] = data.resource.resource_id
312 312 if data.group:
313 313 out['group_name'] = data.group.group_name
314 314 out['group_id'] = data.group.id
315 315 return out
316 316
317 317
318 318 def get_cached_buckets(request, stats_since, end_time, fn, cache_key,
319 319 gap_gen=None, db_session=None, step_interval=None,
320 320 iv_extractor=None,
321 321 rerange=False, *args, **kwargs):
322 322 """ Takes "fn" that should return some data and tries to load the data
323 323 dividing it into daily buckets - if the stats_since and end time give a
324 324 delta bigger than 24hours, then only "todays" data is computed on the fly
325 325
326 326 :param request: (request) request object
327 327 :param stats_since: (datetime) start date of buckets range
328 328 :param end_time: (datetime) end date of buckets range - utcnow() if None
329 329 :param fn: (callable) callable to use to populate buckets should have
330 330 following signature:
331 331 def get_data(request, since_when, until, *args, **kwargs):
332 332
333 333 :param cache_key: (string) cache key that will be used to build bucket
334 334 caches
335 335 :param gap_gen: (callable) gap generator - should return step intervals
336 336 to use with out `fn` callable
337 337 :param db_session: (Session) sqlalchemy session
338 338 :param step_interval: (timedelta) optional step interval if we want to
339 339 override the default determined from total start/end time delta
340 340 :param iv_extractor: (callable) used to get step intervals from data
341 341 returned by `fn` callable
342 342 :param rerange: (bool) handy if we want to change ranges from hours to
343 343 days when cached data is missing - will shorten execution time if `fn`
344 344 callable supports that and we are working with multiple rows - like metrics
345 345 :param args:
346 346 :param kwargs:
347 347
348 348 :return: iterable
349 349 """
350 350 if not end_time:
351 351 end_time = datetime.utcnow().replace(second=0, microsecond=0)
352 352 delta = end_time - stats_since
353 353 # if smaller than 3 days we want to group by 5min else by 1h,
354 354 # for 60 min group by min
355 355 if not gap_gen:
356 356 gap_gen = gap_gen_default
357 357 if not iv_extractor:
358 358 iv_extractor = default_extractor
359 359
360 360 # do not use custom interval if total time range with new iv would exceed
361 361 # end time
362 362 if not step_interval or stats_since + step_interval >= end_time:
363 363 if delta < h.time_deltas.get('12h')['delta']:
364 364 step_interval = timedelta(seconds=60)
365 365 elif delta < h.time_deltas.get('3d')['delta']:
366 366 step_interval = timedelta(seconds=60 * 5)
367 367 elif delta > h.time_deltas.get('2w')['delta']:
368 368 step_interval = timedelta(days=1)
369 369 else:
370 370 step_interval = timedelta(minutes=60)
371 371
372 372 if step_interval >= timedelta(minutes=60):
373 373 log.info('cached_buckets:{}: adjusting start time '
374 374 'for hourly or daily intervals'.format(cache_key))
375 375 stats_since = stats_since.replace(hour=0, minute=0)
376 376
377 377 ranges = [i.start_interval for i in list(gap_gen(stats_since,
378 378 step_interval, [],
379 379 end_time=end_time))]
380 380 buckets = {}
381 381 storage_key = 'buckets:' + cache_key + '{}|{}'
382 382 # this means we basicly cache per hour in 3-14 day intervals but i think
383 383 # its fine at this point - will be faster than db access anyways
384 384
385 385 if len(ranges) >= 1:
386 386 last_ranges = [ranges[-1]]
387 387 else:
388 388 last_ranges = []
389 389 if step_interval >= timedelta(minutes=60):
390 390 for r in ranges:
391 391 k = storage_key.format(step_interval.total_seconds(), r)
392 392 value = request.registry.cache_regions.redis_day_30.get(k)
393 393 # last buckets are never loaded from cache
394 394 is_last_result = (
395 395 r >= end_time - timedelta(hours=6) or r in last_ranges)
396 396 if value is not NO_VALUE and not is_last_result:
397 397 log.info("cached_buckets:{}: "
398 398 "loading range {} from cache".format(cache_key, r))
399 399 buckets[r] = value
400 400 else:
401 401 log.info("cached_buckets:{}: "
402 402 "loading range {} from storage".format(cache_key, r))
403 403 range_size = step_interval
404 404 if (step_interval == timedelta(minutes=60) and
405 405 not is_last_result and rerange):
406 406 range_size = timedelta(days=1)
407 407 r = r.replace(hour=0, minute=0)
408 408 log.info("cached_buckets:{}: "
409 409 "loading collapsed "
410 410 "range {} {}".format(cache_key, r,
411 411 r + range_size))
412 412 bucket_data = fn(
413 413 request, r, r + range_size, step_interval,
414 414 gap_gen, bucket_count=len(ranges), *args, **kwargs)
415 415 for b in bucket_data:
416 416 b_iv = iv_extractor(b)
417 417 buckets[b_iv] = b
418 418 k2 = storage_key.format(
419 419 step_interval.total_seconds(), b_iv)
420 420 request.registry.cache_regions.redis_day_30.set(k2, b)
421 421 log.info("cached_buckets:{}: saving cache".format(cache_key))
422 422 else:
423 423 # bucket count is 1 for short time ranges <= 24h from now
424 424 bucket_data = fn(request, stats_since, end_time, step_interval,
425 425 gap_gen, bucket_count=1, *args, **kwargs)
426 426 for b in bucket_data:
427 427 buckets[iv_extractor(b)] = b
428 428 return buckets
429 429
430 430
431 431 def get_cached_split_data(request, stats_since, end_time, fn, cache_key,
432 432 db_session=None, *args, **kwargs):
433 433 """ Takes "fn" that should return some data and tries to load the data
434 434 dividing it into 2 buckets - cached "since_from" bucket and "today"
435 435 bucket - then the data can be reduced into single value
436 436
437 437 Data is cached if the stats_since and end time give a delta bigger
438 438 than 24hours - then only 24h is computed on the fly
439 439 """
440 440 if not end_time:
441 441 end_time = datetime.utcnow().replace(second=0, microsecond=0)
442 442 delta = end_time - stats_since
443 443
444 444 if delta >= timedelta(minutes=60):
445 445 log.info('cached_split_data:{}: adjusting start time '
446 446 'for hourly or daily intervals'.format(cache_key))
447 447 stats_since = stats_since.replace(hour=0, minute=0)
448 448
449 449 storage_key = 'buckets_split_data:' + cache_key + ':{}|{}'
450 450 old_end_time = end_time.replace(hour=0, minute=0)
451 451
452 452 final_storage_key = storage_key.format(delta.total_seconds(),
453 453 old_end_time)
454 454 older_data = None
455 455
456 456 cdata = request.registry.cache_regions.redis_day_7.get(
457 457 final_storage_key)
458 458
459 459 if cdata:
460 460 log.info("cached_split_data:{}: found old "
461 461 "bucket data".format(cache_key))
462 462 older_data = cdata
463 463
464 464 if (stats_since < end_time - h.time_deltas.get('24h')['delta'] and
465 465 not cdata):
466 466 log.info("cached_split_data:{}: didn't find the "
467 467 "start bucket in cache so load older data".format(cache_key))
468 468 recent_stats_since = old_end_time
469 469 older_data = fn(request, stats_since, recent_stats_since,
470 470 db_session=db_session, *args, **kwargs)
471 471 request.registry.cache_regions.redis_day_7.set(final_storage_key,
472 472 older_data)
473 473 elif stats_since < end_time - h.time_deltas.get('24h')['delta']:
474 474 recent_stats_since = old_end_time
475 475 else:
476 476 recent_stats_since = stats_since
477 477
478 478 log.info("cached_split_data:{}: loading fresh "
479 479 "data bucksts from last 24h ".format(cache_key))
480 480 todays_data = fn(request, recent_stats_since, end_time,
481 481 db_session=db_session, *args, **kwargs)
482 482 return older_data, todays_data
483 483
484 484
485 485 def in_batches(seq, size):
486 486 """
487 487 Splits am iterable into batches of specified size
488 488 :param seq (iterable)
489 489 :param size integer
490 490 """
491 491 return (seq[pos:pos + size] for pos in range(0, len(seq), size))
@@ -1,130 +1,130 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors
4 4 #
5 5 # Licensed under the Apache License, Version 2.0 (the "License");
6 6 # you may not use this file except in compliance with the License.
7 7 # You may obtain a copy of the License at
8 8 #
9 9 # http://www.apache.org/licenses/LICENSE-2.0
10 10 #
11 11 # Unless required by applicable law or agreed to in writing, software
12 12 # distributed under the License is distributed on an "AS IS" BASIS,
13 13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 14 # See the License for the specific language governing permissions and
15 15 # limitations under the License.
16 16
17 17 import logging
18 18
19 19 from sqlalchemy.ext.declarative import declarative_base
20 20 from sqlalchemy import MetaData
21 21 from sqlalchemy.orm import scoped_session
22 22 from sqlalchemy.orm import sessionmaker
23 23 from zope.sqlalchemy import ZopeTransactionExtension
24 24 import ziggurat_foundations
25 25 from ziggurat_foundations.models.base import get_db_session
26 26
27 27 log = logging.getLogger(__name__)
28 28
29 29 DBSession = scoped_session(sessionmaker(extension=ZopeTransactionExtension()))
30 30
31 31 NAMING_CONVENTION = {
32 32 "ix": 'ix_%(column_0_label)s',
33 33 "uq": "uq_%(table_name)s_%(column_0_name)s",
34 34 "ck": "ck_%(table_name)s_%(constraint_name)s",
35 35 "fk": "fk_%(table_name)s_%(column_0_name)s_%(referred_table_name)s",
36 36 "pk": "pk_%(table_name)s"
37 37 }
38 38
39 39 metadata = MetaData(naming_convention=NAMING_CONVENTION)
40 40 Base = declarative_base(metadata=metadata)
41 41
42 42 # optional for request.db approach
43 43 ziggurat_foundations.models.DBSession = DBSession
44 44
45 45
46 46 class Datastores(object):
47 47 redis = None
48 48 es = None
49 49
50 50
51 51 def register_datastores(es_conn, redis_conn, redis_lockmgr):
52 52 Datastores.es = es_conn
53 53 Datastores.redis = redis_conn
54 54 Datastores.lockmgr = redis_lockmgr
55 55
56 56
57 57 class SliceableESQuery(object):
58 58 def __init__(self, query, sort_query=None, aggregations=False, **kwconfig):
59 59 self.query = query
60 60 self.sort_query = sort_query
61 61 self.aggregations = aggregations
62 62 self.items_per_page = kwconfig.pop('items_per_page', 10)
63 63 self.page = kwconfig.pop('page', 1)
64 64 self.kwconfig = kwconfig
65 65 self.result = None
66 66
67 67 def __getitem__(self, index):
68 68 config = self.kwconfig.copy()
69 config['es_from'] = index.start
69 config['from_'] = index.start
70 70 query = self.query.copy()
71 71 if self.sort_query:
72 72 query.update(self.sort_query)
73 self.result = Datastores.es.search(query, size=self.items_per_page,
73 self.result = Datastores.es.search(body=query, size=self.items_per_page,
74 74 **config)
75 75 if self.aggregations:
76 76 self.items = self.result.get('aggregations')
77 77 else:
78 78 self.items = self.result['hits']['hits']
79 79
80 80 return self.items
81 81
82 82 def __iter__(self):
83 83 return self.result
84 84
85 85 def __len__(self):
86 86 config = self.kwconfig.copy()
87 87 query = self.query.copy()
88 self.result = Datastores.es.search(query, size=self.items_per_page,
88 self.result = Datastores.es.search(body=query, size=self.items_per_page,
89 89 **config)
90 90 if self.aggregations:
91 91 self.items = self.result.get('aggregations')
92 92 else:
93 93 self.items = self.result['hits']['hits']
94 94
95 95 count = int(self.result['hits']['total'])
96 96 return count if count < 5000 else 5000
97 97
98 98
99 99 from appenlight.models.resource import Resource
100 100 from appenlight.models.application import Application
101 101 from appenlight.models.user import User
102 102 from appenlight.models.alert_channel import AlertChannel
103 103 from appenlight.models.alert_channel_action import AlertChannelAction
104 104 from appenlight.models.metric import Metric
105 105 from appenlight.models.application_postprocess_conf import \
106 106 ApplicationPostprocessConf
107 107 from appenlight.models.auth_token import AuthToken
108 108 from appenlight.models.event import Event
109 109 from appenlight.models.external_identity import ExternalIdentity
110 110 from appenlight.models.group import Group
111 111 from appenlight.models.group_permission import GroupPermission
112 112 from appenlight.models.group_resource_permission import GroupResourcePermission
113 113 from appenlight.models.log import Log
114 114 from appenlight.models.plugin_config import PluginConfig
115 115 from appenlight.models.report import Report
116 116 from appenlight.models.report_group import ReportGroup
117 117 from appenlight.models.report_comment import ReportComment
118 118 from appenlight.models.report_assignment import ReportAssignment
119 119 from appenlight.models.report_stat import ReportStat
120 120 from appenlight.models.slow_call import SlowCall
121 121 from appenlight.models.tag import Tag
122 122 from appenlight.models.user_group import UserGroup
123 123 from appenlight.models.user_permission import UserPermission
124 124 from appenlight.models.user_resource_permission import UserResourcePermission
125 125 from ziggurat_foundations import ziggurat_model_init
126 126
127 127 ziggurat_model_init(User, Group, UserGroup, GroupPermission, UserPermission,
128 128 UserResourcePermission, GroupResourcePermission,
129 129 Resource,
130 130 ExternalIdentity, passwordmanager=None)
@@ -1,514 +1,514 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors
4 4 #
5 5 # Licensed under the Apache License, Version 2.0 (the "License");
6 6 # you may not use this file except in compliance with the License.
7 7 # You may obtain a copy of the License at
8 8 #
9 9 # http://www.apache.org/licenses/LICENSE-2.0
10 10 #
11 11 # Unless required by applicable law or agreed to in writing, software
12 12 # distributed under the License is distributed on an "AS IS" BASIS,
13 13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 14 # See the License for the specific language governing permissions and
15 15 # limitations under the License.
16 16
17 17 from datetime import datetime, timedelta
18 18 import math
19 19 import uuid
20 20 import hashlib
21 21 import copy
22 22 import urllib.parse
23 23 import logging
24 24 import sqlalchemy as sa
25 25
26 26 from appenlight.models import Base, Datastores
27 27 from appenlight.lib.utils.date_utils import convert_date
28 28 from appenlight.lib.utils import convert_es_type
29 29 from appenlight.models.slow_call import SlowCall
30 30 from appenlight.lib.utils import channelstream_request
31 31 from appenlight.lib.enums import ReportType, Language
32 32 from pyramid.threadlocal import get_current_registry, get_current_request
33 33 from sqlalchemy.dialects.postgresql import JSON
34 34 from ziggurat_foundations.models.base import BaseModel
35 35
36 36 log = logging.getLogger(__name__)
37 37
38 38 REPORT_TYPE_MATRIX = {
39 39 'http_status': {"type": 'int',
40 40 "ops": ('eq', 'ne', 'ge', 'le',)},
41 41 'group:priority': {"type": 'int',
42 42 "ops": ('eq', 'ne', 'ge', 'le',)},
43 43 'duration': {"type": 'float',
44 44 "ops": ('ge', 'le',)},
45 45 'url_domain': {"type": 'unicode',
46 46 "ops": ('eq', 'ne', 'startswith', 'endswith', 'contains',)},
47 47 'url_path': {"type": 'unicode',
48 48 "ops": ('eq', 'ne', 'startswith', 'endswith', 'contains',)},
49 49 'error': {"type": 'unicode',
50 50 "ops": ('eq', 'ne', 'startswith', 'endswith', 'contains',)},
51 51 'tags:server_name': {"type": 'unicode',
52 52 "ops": ('eq', 'ne', 'startswith', 'endswith',
53 53 'contains',)},
54 54 'traceback': {"type": 'unicode',
55 55 "ops": ('contains',)},
56 56 'group:occurences': {"type": 'int',
57 57 "ops": ('eq', 'ne', 'ge', 'le',)}
58 58 }
59 59
60 60
61 61 class Report(Base, BaseModel):
62 62 __tablename__ = 'reports'
63 63 __table_args__ = {'implicit_returning': False}
64 64
65 65 id = sa.Column(sa.Integer, nullable=False, primary_key=True)
66 66 group_id = sa.Column(sa.BigInteger,
67 67 sa.ForeignKey('reports_groups.id', ondelete='cascade',
68 68 onupdate='cascade'))
69 69 resource_id = sa.Column(sa.Integer(), nullable=False, index=True)
70 70 report_type = sa.Column(sa.Integer(), nullable=False, index=True)
71 71 error = sa.Column(sa.UnicodeText(), index=True)
72 72 extra = sa.Column(JSON(), default={})
73 73 request = sa.Column(JSON(), nullable=False, default={})
74 74 ip = sa.Column(sa.String(39), index=True, default='')
75 75 username = sa.Column(sa.Unicode(255), default='')
76 76 user_agent = sa.Column(sa.Unicode(255), default='')
77 77 url = sa.Column(sa.UnicodeText(), index=True)
78 78 request_id = sa.Column(sa.Text())
79 79 request_stats = sa.Column(JSON(), nullable=False, default={})
80 80 traceback = sa.Column(JSON(), nullable=False, default=None)
81 81 traceback_hash = sa.Column(sa.Text())
82 82 start_time = sa.Column(sa.DateTime(), default=datetime.utcnow,
83 83 server_default=sa.func.now())
84 84 end_time = sa.Column(sa.DateTime())
85 85 duration = sa.Column(sa.Float, default=0)
86 86 http_status = sa.Column(sa.Integer, index=True)
87 87 url_domain = sa.Column(sa.Unicode(100), index=True)
88 88 url_path = sa.Column(sa.Unicode(255), index=True)
89 89 tags = sa.Column(JSON(), nullable=False, default={})
90 90 language = sa.Column(sa.Integer(), default=0)
91 91 # this is used to determine partition for the report
92 92 report_group_time = sa.Column(sa.DateTime(), default=datetime.utcnow,
93 93 server_default=sa.func.now())
94 94
95 95 logs = sa.orm.relationship(
96 96 'Log',
97 97 lazy='dynamic',
98 98 passive_deletes=True,
99 99 passive_updates=True,
100 100 primaryjoin="and_(Report.request_id==Log.request_id, "
101 101 "Log.request_id != None, Log.request_id != '')",
102 102 foreign_keys='[Log.request_id]')
103 103
104 104 slow_calls = sa.orm.relationship('SlowCall',
105 105 backref='detail',
106 106 cascade="all, delete-orphan",
107 107 passive_deletes=True,
108 108 passive_updates=True,
109 109 order_by='SlowCall.timestamp')
110 110
111 111 def set_data(self, data, resource, protocol_version=None):
112 112 self.http_status = data['http_status']
113 113 self.priority = data['priority']
114 114 self.error = data['error']
115 115 report_language = data.get('language', '').lower()
116 116 self.language = getattr(Language, report_language, Language.unknown)
117 117 # we need temp holder here to decide later
118 118 # if we want to to commit the tags if report is marked for creation
119 119 self.tags = {
120 120 'server_name': data['server'],
121 121 'view_name': data['view_name']
122 122 }
123 123 if data.get('tags'):
124 124 for tag_tuple in data['tags']:
125 125 self.tags[tag_tuple[0]] = tag_tuple[1]
126 126 self.traceback = data['traceback']
127 127 stripped_traceback = self.stripped_traceback()
128 128 tb_repr = repr(stripped_traceback).encode('utf8')
129 129 self.traceback_hash = hashlib.sha1(tb_repr).hexdigest()
130 130 url_info = urllib.parse.urlsplit(
131 131 data.get('url', ''), allow_fragments=False)
132 132 self.url_domain = url_info.netloc[:128]
133 133 self.url_path = url_info.path[:2048]
134 134 self.occurences = data['occurences']
135 135 if self.error:
136 136 self.report_type = ReportType.error
137 137 else:
138 138 self.report_type = ReportType.slow
139 139
140 140 # but if its status 404 its 404 type
141 141 if self.http_status in [404, '404'] or self.error == '404 Not Found':
142 142 self.report_type = ReportType.not_found
143 143 self.error = ''
144 144
145 145 self.generate_grouping_hash(data.get('appenlight.group_string',
146 146 data.get('group_string')),
147 147 resource.default_grouping,
148 148 protocol_version)
149 149
150 150 # details
151 151 if data['http_status'] in [404, '404']:
152 152 data = {"username": data["username"],
153 153 "ip": data["ip"],
154 154 "url": data["url"],
155 155 "user_agent": data["user_agent"]}
156 156 if data.get('HTTP_REFERER') or data.get('http_referer'):
157 157 data['HTTP_REFERER'] = data.get(
158 158 'HTTP_REFERER', '') or data.get('http_referer', '')
159 159
160 160 self.resource_id = resource.resource_id
161 161 self.username = data['username']
162 162 self.user_agent = data['user_agent']
163 163 self.ip = data['ip']
164 164 self.extra = {}
165 165 if data.get('extra'):
166 166 for extra_tuple in data['extra']:
167 167 self.extra[extra_tuple[0]] = extra_tuple[1]
168 168
169 169 self.url = data['url']
170 170 self.request_id = data.get('request_id', '').replace('-', '') or str(
171 171 uuid.uuid4())
172 172 request_data = data.get('request', {})
173 173
174 174 self.request = request_data
175 175 self.request_stats = data.get('request_stats', {})
176 176 traceback = data.get('traceback')
177 177 if not traceback:
178 178 traceback = data.get('frameinfo')
179 179 self.traceback = traceback
180 180 start_date = convert_date(data.get('start_time'))
181 181 if not self.start_time or self.start_time < start_date:
182 182 self.start_time = start_date
183 183
184 184 self.end_time = convert_date(data.get('end_time'), False)
185 185 self.duration = 0
186 186
187 187 if self.start_time and self.end_time:
188 188 d = self.end_time - self.start_time
189 189 self.duration = d.total_seconds()
190 190
191 191 # update tags with other vars
192 192 if self.username:
193 193 self.tags['user_name'] = self.username
194 194 self.tags['report_language'] = Language.key_from_value(self.language)
195 195
196 196 def add_slow_calls(self, data, report_group):
197 197 slow_calls = []
198 198 for call in data.get('slow_calls', []):
199 199 sc_inst = SlowCall()
200 200 sc_inst.set_data(call, resource_id=self.resource_id,
201 201 report_group=report_group)
202 202 slow_calls.append(sc_inst)
203 203 self.slow_calls.extend(slow_calls)
204 204 return slow_calls
205 205
206 206 def get_dict(self, request, details=False, exclude_keys=None,
207 207 include_keys=None):
208 208 from appenlight.models.services.report_group import ReportGroupService
209 209 instance_dict = super(Report, self).get_dict()
210 210 instance_dict['req_stats'] = self.req_stats()
211 211 instance_dict['group'] = {}
212 212 instance_dict['group']['id'] = self.report_group.id
213 213 instance_dict['group'][
214 214 'total_reports'] = self.report_group.total_reports
215 215 instance_dict['group']['last_report'] = self.report_group.last_report
216 216 instance_dict['group']['priority'] = self.report_group.priority
217 217 instance_dict['group']['occurences'] = self.report_group.occurences
218 218 instance_dict['group'][
219 219 'last_timestamp'] = self.report_group.last_timestamp
220 220 instance_dict['group'][
221 221 'first_timestamp'] = self.report_group.first_timestamp
222 222 instance_dict['group']['public'] = self.report_group.public
223 223 instance_dict['group']['fixed'] = self.report_group.fixed
224 224 instance_dict['group']['read'] = self.report_group.read
225 225 instance_dict['group'][
226 226 'average_duration'] = self.report_group.average_duration
227 227
228 228 instance_dict[
229 229 'resource_name'] = self.report_group.application.resource_name
230 230 instance_dict['report_type'] = self.report_type
231 231
232 232 if instance_dict['http_status'] == 404 and not instance_dict['error']:
233 233 instance_dict['error'] = '404 Not Found'
234 234
235 235 if details:
236 236 instance_dict['affected_users_count'] = \
237 237 ReportGroupService.affected_users_count(self.report_group)
238 238 instance_dict['top_affected_users'] = [
239 239 {'username': u.username, 'count': u.count} for u in
240 240 ReportGroupService.top_affected_users(self.report_group)]
241 241 instance_dict['application'] = {'integrations': []}
242 242 for integration in self.report_group.application.integrations:
243 243 if integration.front_visible:
244 244 instance_dict['application']['integrations'].append(
245 245 {'name': integration.integration_name,
246 246 'action': integration.integration_action})
247 247 instance_dict['comments'] = [c.get_dict() for c in
248 248 self.report_group.comments]
249 249
250 250 instance_dict['group']['next_report'] = None
251 251 instance_dict['group']['previous_report'] = None
252 252 next_in_group = self.get_next_in_group(request)
253 253 previous_in_group = self.get_previous_in_group(request)
254 254 if next_in_group:
255 255 instance_dict['group']['next_report'] = next_in_group
256 256 if previous_in_group:
257 257 instance_dict['group']['previous_report'] = previous_in_group
258 258
259 259 # slow call ordering
260 260 def find_parent(row, data):
261 261 for r in reversed(data):
262 262 try:
263 263 if (row['timestamp'] > r['timestamp'] and
264 264 row['end_time'] < r['end_time']):
265 265 return r
266 266 except TypeError as e:
267 267 log.warning('reports_view.find_parent: %s' % e)
268 268 return None
269 269
270 270 new_calls = []
271 271 calls = [c.get_dict() for c in self.slow_calls]
272 272 while calls:
273 273 # start from end
274 274 for x in range(len(calls) - 1, -1, -1):
275 275 parent = find_parent(calls[x], calls)
276 276 if parent:
277 277 parent['children'].append(calls[x])
278 278 else:
279 279 # no parent at all? append to new calls anyways
280 280 new_calls.append(calls[x])
281 281 # print 'append', calls[x]
282 282 del calls[x]
283 283 break
284 284 instance_dict['slow_calls'] = new_calls
285 285
286 286 instance_dict['front_url'] = self.get_public_url(request)
287 287
288 288 exclude_keys_list = exclude_keys or []
289 289 include_keys_list = include_keys or []
290 290 for k in list(instance_dict.keys()):
291 291 if k == 'group':
292 292 continue
293 293 if (k in exclude_keys_list or
294 294 (k not in include_keys_list and include_keys)):
295 295 del instance_dict[k]
296 296 return instance_dict
297 297
298 298 def get_previous_in_group(self, request):
299 299 query = {
300 300 "size": 1,
301 301 "query": {
302 302 "filtered": {
303 303 "filter": {
304 304 "and": [{"term": {"group_id": self.group_id}},
305 305 {"range": {"pg_id": {"lt": self.id}}}]
306 306 }
307 307 }
308 308 },
309 309 "sort": [
310 310 {"_doc": {"order": "desc"}},
311 311 ],
312 312 }
313 result = request.es_conn.search(query, index=self.partition_id,
313 result = request.es_conn.search(body=query, index=self.partition_id,
314 314 doc_type='report')
315 315 if result['hits']['total']:
316 316 return result['hits']['hits'][0]['_source']['pg_id']
317 317
318 318 def get_next_in_group(self, request):
319 319 query = {
320 320 "size": 1,
321 321 "query": {
322 322 "filtered": {
323 323 "filter": {
324 324 "and": [{"term": {"group_id": self.group_id}},
325 325 {"range": {"pg_id": {"gt": self.id}}}]
326 326 }
327 327 }
328 328 },
329 329 "sort": [
330 330 {"_doc": {"order": "asc"}},
331 331 ],
332 332 }
333 result = request.es_conn.search(query, index=self.partition_id,
333 result = request.es_conn.search(body=query, index=self.partition_id,
334 334 doc_type='report')
335 335 if result['hits']['total']:
336 336 return result['hits']['hits'][0]['_source']['pg_id']
337 337
338 338 def get_public_url(self, request=None, report_group=None, _app_url=None):
339 339 """
340 340 Returns url that user can use to visit specific report
341 341 """
342 342 if not request:
343 343 request = get_current_request()
344 344 url = request.route_url('/', _app_url=_app_url)
345 345 if report_group:
346 346 return (url + 'ui/report/%s/%s') % (report_group.id, self.id)
347 347 return (url + 'ui/report/%s/%s') % (self.group_id, self.id)
348 348
349 349 def req_stats(self):
350 350 stats = self.request_stats.copy()
351 351 stats['percentages'] = {}
352 352 stats['percentages']['main'] = 100.0
353 353 main = stats.get('main', 0.0)
354 354 if not main:
355 355 return None
356 356 for name, call_time in stats.items():
357 357 if ('calls' not in name and 'main' not in name and
358 358 'percentages' not in name):
359 359 stats['main'] -= call_time
360 360 stats['percentages'][name] = math.floor(
361 361 (call_time / main * 100.0))
362 362 stats['percentages']['main'] -= stats['percentages'][name]
363 363 if stats['percentages']['main'] < 0.0:
364 364 stats['percentages']['main'] = 0.0
365 365 stats['main'] = 0.0
366 366 return stats
367 367
368 368 def generate_grouping_hash(self, hash_string=None, default_grouping=None,
369 369 protocol_version=None):
370 370 """
371 371 Generates SHA1 hash that will be used to group reports together
372 372 """
373 373 if not hash_string:
374 374 location = self.tags.get('view_name') or self.url_path;
375 375 server_name = self.tags.get('server_name') or ''
376 376 if default_grouping == 'url_traceback':
377 377 hash_string = '%s_%s_%s' % (self.traceback_hash, location,
378 378 self.error)
379 379 if self.language == Language.javascript:
380 380 hash_string = '%s_%s' % (self.traceback_hash, self.error)
381 381
382 382 elif default_grouping == 'traceback_server':
383 383 hash_string = '%s_%s' % (self.traceback_hash, server_name)
384 384 if self.language == Language.javascript:
385 385 hash_string = '%s_%s' % (self.traceback_hash, server_name)
386 386 else:
387 387 hash_string = '%s_%s' % (self.error, location)
388 388 month = datetime.utcnow().date().replace(day=1)
389 389 hash_string = '{}_{}'.format(month, hash_string)
390 390 binary_string = hash_string.encode('utf8')
391 391 self.grouping_hash = hashlib.sha1(binary_string).hexdigest()
392 392 return self.grouping_hash
393 393
394 394 def stripped_traceback(self):
395 395 """
396 396 Traceback without local vars
397 397 """
398 398 stripped_traceback = copy.deepcopy(self.traceback)
399 399
400 400 if isinstance(stripped_traceback, list):
401 401 for row in stripped_traceback:
402 402 row.pop('vars', None)
403 403 return stripped_traceback
404 404
405 405 def notify_channel(self, report_group):
406 406 """
407 407 Sends notification to websocket channel
408 408 """
409 409 settings = get_current_registry().settings
410 410 log.info('notify channelstream')
411 411 if self.report_type != ReportType.error:
412 412 return
413 413 payload = {
414 414 'type': 'message',
415 415 "user": '__system__',
416 416 "channel": 'app_%s' % self.resource_id,
417 417 'message': {
418 418 'topic': 'front_dashboard.new_topic',
419 419 'report': {
420 420 'group': {
421 421 'priority': report_group.priority,
422 422 'first_timestamp': report_group.first_timestamp,
423 423 'last_timestamp': report_group.last_timestamp,
424 424 'average_duration': report_group.average_duration,
425 425 'occurences': report_group.occurences
426 426 },
427 427 'report_id': self.id,
428 428 'group_id': self.group_id,
429 429 'resource_id': self.resource_id,
430 430 'http_status': self.http_status,
431 431 'url_domain': self.url_domain,
432 432 'url_path': self.url_path,
433 433 'error': self.error or '',
434 434 'server': self.tags.get('server_name'),
435 435 'view_name': self.tags.get('view_name'),
436 436 'front_url': self.get_public_url(),
437 437 }
438 438 }
439 439
440 440 }
441 441 channelstream_request(settings['cometd.secret'], '/message', [payload],
442 442 servers=[settings['cometd_servers']])
443 443
444 444 def es_doc(self):
445 445 tags = {}
446 446 tag_list = []
447 447 for name, value in self.tags.items():
448 448 name = name.replace('.', '_')
449 449 tag_list.append(name)
450 450 tags[name] = {
451 451 "values": convert_es_type(value),
452 452 "numeric_values": value if (
453 453 isinstance(value, (int, float)) and
454 454 not isinstance(value, bool)) else None}
455 455
456 456 if 'user_name' not in self.tags and self.username:
457 457 tags["user_name"] = {"value": [self.username],
458 458 "numeric_value": None}
459 459 return {
460 460 '_id': str(self.id),
461 461 'pg_id': str(self.id),
462 462 'resource_id': self.resource_id,
463 463 'http_status': self.http_status or '',
464 464 'start_time': self.start_time,
465 465 'end_time': self.end_time,
466 466 'url_domain': self.url_domain if self.url_domain else '',
467 467 'url_path': self.url_path if self.url_path else '',
468 468 'duration': self.duration,
469 469 'error': self.error if self.error else '',
470 470 'report_type': self.report_type,
471 471 'request_id': self.request_id,
472 472 'ip': self.ip,
473 473 'group_id': str(self.group_id),
474 474 '_parent': str(self.group_id),
475 475 'tags': tags,
476 476 'tag_list': tag_list
477 477 }
478 478
479 479 @property
480 480 def partition_id(self):
481 481 return 'rcae_r_%s' % self.report_group_time.strftime('%Y_%m')
482 482
483 483 def partition_range(self):
484 484 start_date = self.report_group_time.date().replace(day=1)
485 485 end_date = start_date + timedelta(days=40)
486 486 end_date = end_date.replace(day=1)
487 487 return start_date, end_date
488 488
489 489
490 490 def after_insert(mapper, connection, target):
491 491 if not hasattr(target, '_skip_ft_index'):
492 492 data = target.es_doc()
493 493 data.pop('_id', None)
494 494 Datastores.es.index(target.partition_id, 'report', data,
495 495 parent=target.group_id, id=target.id)
496 496
497 497
498 498 def after_update(mapper, connection, target):
499 499 if not hasattr(target, '_skip_ft_index'):
500 500 data = target.es_doc()
501 501 data.pop('_id', None)
502 502 Datastores.es.index(target.partition_id, 'report', data,
503 503 parent=target.group_id, id=target.id)
504 504
505 505
506 506 def after_delete(mapper, connection, target):
507 507 if not hasattr(target, '_skip_ft_index'):
508 query = {'term': {'pg_id': target.id}}
509 Datastores.es.delete_by_query(target.partition_id, 'report', query)
508 query = {"query":{'term': {'pg_id': target.id}}}
509 Datastores.es.transport.perform_request("DELETE", '/{}/{}/_query'.format(target.partition_id, 'report'), body=query)
510 510
511 511
512 512 sa.event.listen(Report, 'after_insert', after_insert)
513 513 sa.event.listen(Report, 'after_update', after_update)
514 514 sa.event.listen(Report, 'after_delete', after_delete)
@@ -1,270 +1,268 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors
4 4 #
5 5 # Licensed under the Apache License, Version 2.0 (the "License");
6 6 # you may not use this file except in compliance with the License.
7 7 # You may obtain a copy of the License at
8 8 #
9 9 # http://www.apache.org/licenses/LICENSE-2.0
10 10 #
11 11 # Unless required by applicable law or agreed to in writing, software
12 12 # distributed under the License is distributed on an "AS IS" BASIS,
13 13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 14 # See the License for the specific language governing permissions and
15 15 # limitations under the License.
16 16
17 17 import logging
18 18 import sqlalchemy as sa
19 19
20 20 from datetime import datetime, timedelta
21 21
22 22 from pyramid.threadlocal import get_current_request
23 23 from sqlalchemy.dialects.postgresql import JSON
24 24 from ziggurat_foundations.models.base import BaseModel
25 25
26 26 from appenlight.models import Base, get_db_session, Datastores
27 27 from appenlight.lib.enums import ReportType
28 28 from appenlight.lib.rule import Rule
29 29 from appenlight.lib.redis_keys import REDIS_KEYS
30 30 from appenlight.models.report import REPORT_TYPE_MATRIX
31 31
32 32 log = logging.getLogger(__name__)
33 33
34 34
35 35 class ReportGroup(Base, BaseModel):
36 36 __tablename__ = 'reports_groups'
37 37 __table_args__ = {'implicit_returning': False}
38 38
39 39 id = sa.Column(sa.BigInteger(), nullable=False, primary_key=True)
40 40 resource_id = sa.Column(sa.Integer(),
41 41 sa.ForeignKey('applications.resource_id',
42 42 onupdate='CASCADE',
43 43 ondelete='CASCADE'),
44 44 nullable=False,
45 45 index=True)
46 46 priority = sa.Column(sa.Integer, nullable=False, index=True, default=5,
47 47 server_default='5')
48 48 first_timestamp = sa.Column(sa.DateTime(), default=datetime.utcnow,
49 49 server_default=sa.func.now())
50 50 last_timestamp = sa.Column(sa.DateTime(), default=datetime.utcnow,
51 51 server_default=sa.func.now())
52 52 error = sa.Column(sa.UnicodeText(), index=True)
53 53 grouping_hash = sa.Column(sa.String(40), default='')
54 54 triggered_postprocesses_ids = sa.Column(JSON(), nullable=False,
55 55 default=list)
56 56 report_type = sa.Column(sa.Integer, default=1)
57 57 total_reports = sa.Column(sa.Integer, default=1)
58 58 last_report = sa.Column(sa.Integer)
59 59 occurences = sa.Column(sa.Integer, default=1)
60 60 average_duration = sa.Column(sa.Float, default=0)
61 61 summed_duration = sa.Column(sa.Float, default=0)
62 62 read = sa.Column(sa.Boolean(), index=True, default=False)
63 63 fixed = sa.Column(sa.Boolean(), index=True, default=False)
64 64 notified = sa.Column(sa.Boolean(), index=True, default=False)
65 65 public = sa.Column(sa.Boolean(), index=True, default=False)
66 66
67 67 reports = sa.orm.relationship('Report',
68 68 lazy='dynamic',
69 69 backref='report_group',
70 70 cascade="all, delete-orphan",
71 71 passive_deletes=True,
72 72 passive_updates=True, )
73 73
74 74 comments = sa.orm.relationship('ReportComment',
75 75 lazy='dynamic',
76 76 backref='report',
77 77 cascade="all, delete-orphan",
78 78 passive_deletes=True,
79 79 passive_updates=True,
80 80 order_by="ReportComment.comment_id")
81 81
82 82 assigned_users = sa.orm.relationship('User',
83 83 backref=sa.orm.backref(
84 84 'assigned_reports_relation',
85 85 lazy='dynamic',
86 86 order_by=sa.desc(
87 87 "reports_groups.id")
88 88 ),
89 89 passive_deletes=True,
90 90 passive_updates=True,
91 91 secondary='reports_assignments',
92 92 order_by="User.user_name")
93 93
94 94 stats = sa.orm.relationship('ReportStat',
95 95 lazy='dynamic',
96 96 backref='report',
97 97 passive_deletes=True,
98 98 passive_updates=True, )
99 99
100 100 last_report_ref = sa.orm.relationship('Report',
101 101 uselist=False,
102 102 primaryjoin="ReportGroup.last_report "
103 103 "== Report.id",
104 104 foreign_keys="Report.id",
105 105 cascade="all, delete-orphan",
106 106 passive_deletes=True,
107 107 passive_updates=True, )
108 108
109 109 def __repr__(self):
110 110 return '<ReportGroup id:{}>'.format(self.id)
111 111
112 112 def get_report(self, report_id=None, public=False):
113 113 """
114 114 Gets report with specific id or latest report if id was not specified
115 115 """
116 116 from .report import Report
117 117
118 118 if not report_id:
119 119 return self.last_report_ref
120 120 else:
121 121 return self.reports.filter(Report.id == report_id).first()
122 122
123 123 def get_public_url(self, request, _app_url=None):
124 124 url = request.route_url('/', _app_url=_app_url)
125 125 return (url + 'ui/report/%s') % self.id
126 126
127 127 def run_postprocessing(self, report):
128 128 """
129 129 Alters report group priority based on postprocessing configuration
130 130 """
131 131 request = get_current_request()
132 132 get_db_session(None, self).flush()
133 133 for action in self.application.postprocess_conf:
134 134 get_db_session(None, self).flush()
135 135 rule_obj = Rule(action.rule, REPORT_TYPE_MATRIX)
136 136 report_dict = report.get_dict(request)
137 137 # if was not processed yet
138 138 if (rule_obj.match(report_dict) and
139 139 action.pkey not in self.triggered_postprocesses_ids):
140 140 action.postprocess(self)
141 141 # this way sqla can track mutation of list
142 142 self.triggered_postprocesses_ids = \
143 143 self.triggered_postprocesses_ids + [action.pkey]
144 144
145 145 get_db_session(None, self).flush()
146 146 # do not go out of bounds
147 147 if self.priority < 1:
148 148 self.priority = 1
149 149 if self.priority > 10:
150 150 self.priority = 10
151 151
152 152 def get_dict(self, request):
153 153 instance_dict = super(ReportGroup, self).get_dict()
154 154 instance_dict['server_name'] = self.get_report().tags.get(
155 155 'server_name')
156 156 instance_dict['view_name'] = self.get_report().tags.get('view_name')
157 157 instance_dict['resource_name'] = self.application.resource_name
158 158 instance_dict['report_type'] = self.get_report().report_type
159 159 instance_dict['url_path'] = self.get_report().url_path
160 160 instance_dict['front_url'] = self.get_report().get_public_url(request)
161 161 del instance_dict['triggered_postprocesses_ids']
162 162 return instance_dict
163 163
164 164 def es_doc(self):
165 165 return {
166 166 '_id': str(self.id),
167 167 'pg_id': str(self.id),
168 168 'resource_id': self.resource_id,
169 169 'error': self.error,
170 170 'fixed': self.fixed,
171 171 'public': self.public,
172 172 'read': self.read,
173 173 'priority': self.priority,
174 174 'occurences': self.occurences,
175 175 'average_duration': self.average_duration,
176 176 'summed_duration': self.summed_duration,
177 177 'first_timestamp': self.first_timestamp,
178 178 'last_timestamp': self.last_timestamp
179 179 }
180 180
181 181 def set_notification_info(self, notify_10=False, notify_100=False):
182 182 """
183 183 Update redis notification maps for notification job
184 184 """
185 185 current_time = datetime.utcnow().replace(second=0, microsecond=0)
186 186 # global app counter
187 187 key = REDIS_KEYS['counters']['reports_per_type'].format(
188 188 self.report_type, current_time)
189 189 redis_pipeline = Datastores.redis.pipeline()
190 190 redis_pipeline.incr(key)
191 191 redis_pipeline.expire(key, 3600 * 24)
192 192 # detailed app notification for alerts and notifications
193 193 redis_pipeline.sadd(
194 194 REDIS_KEYS['apps_that_had_reports'], self.resource_id)
195 195 redis_pipeline.sadd(
196 196 REDIS_KEYS['apps_that_had_reports_alerting'], self.resource_id)
197 197 # only notify for exceptions here
198 198 if self.report_type == ReportType.error:
199 199 redis_pipeline.sadd(
200 200 REDIS_KEYS['apps_that_had_reports'], self.resource_id)
201 201 redis_pipeline.sadd(
202 202 REDIS_KEYS['apps_that_had_error_reports_alerting'],
203 203 self.resource_id)
204 204 key = REDIS_KEYS['counters']['report_group_occurences'].format(self.id)
205 205 redis_pipeline.incr(key)
206 206 redis_pipeline.expire(key, 3600 * 24)
207 207 key = REDIS_KEYS['counters']['report_group_occurences_alerting'].format(
208 208 self.id)
209 209 redis_pipeline.incr(key)
210 210 redis_pipeline.expire(key, 3600 * 24)
211 211
212 212 if notify_10:
213 213 key = REDIS_KEYS['counters'][
214 214 'report_group_occurences_10th'].format(self.id)
215 215 redis_pipeline.setex(key, 3600 * 24, 1)
216 216 if notify_100:
217 217 key = REDIS_KEYS['counters'][
218 218 'report_group_occurences_100th'].format(self.id)
219 219 redis_pipeline.setex(key, 3600 * 24, 1)
220 220
221 221 key = REDIS_KEYS['reports_to_notify_per_type_per_app'].format(
222 222 self.report_type, self.resource_id)
223 223 redis_pipeline.sadd(key, self.id)
224 224 redis_pipeline.expire(key, 3600 * 24)
225 225 key = REDIS_KEYS['reports_to_notify_per_type_per_app_alerting'].format(
226 226 self.report_type, self.resource_id)
227 227 redis_pipeline.sadd(key, self.id)
228 228 redis_pipeline.expire(key, 3600 * 24)
229 229 redis_pipeline.execute()
230 230
231 231 @property
232 232 def partition_id(self):
233 233 return 'rcae_r_%s' % self.first_timestamp.strftime('%Y_%m')
234 234
235 235 def partition_range(self):
236 236 start_date = self.first_timestamp.date().replace(day=1)
237 237 end_date = start_date + timedelta(days=40)
238 238 end_date = end_date.replace(day=1)
239 239 return start_date, end_date
240 240
241 241
242 242 def after_insert(mapper, connection, target):
243 243 if not hasattr(target, '_skip_ft_index'):
244 244 data = target.es_doc()
245 245 data.pop('_id', None)
246 246 Datastores.es.index(target.partition_id, 'report_group',
247 247 data, id=target.id)
248 248
249 249
250 250 def after_update(mapper, connection, target):
251 251 if not hasattr(target, '_skip_ft_index'):
252 252 data = target.es_doc()
253 253 data.pop('_id', None)
254 254 Datastores.es.index(target.partition_id, 'report_group',
255 255 data, id=target.id)
256 256
257 257
258 258 def after_delete(mapper, connection, target):
259 query = {'term': {'group_id': target.id}}
260 # TODO: routing seems unnecessary, need to test a bit more
261 # Datastores.es.delete_by_query(target.partition_id, 'report', query,
262 # query_params={'routing':str(target.id)})
263 Datastores.es.delete_by_query(target.partition_id, 'report', query)
264 query = {'term': {'pg_id': target.id}}
265 Datastores.es.delete_by_query(target.partition_id, 'report_group', query)
259 query = {"query": {'term': {'group_id': target.id}}}
260 # delete by query
261 Datastores.es.transport.perform_request("DELETE", '/{}/{}/_query'.format(target.partition_id, 'report'), body=query)
262 query = {"query": {'term': {'pg_id': target.id}}}
263 Datastores.es.transport.perform_request("DELETE", '/{}/{}/_query'.format(target.partition_id, 'report_group'), body=query)
266 264
267 265
268 266 sa.event.listen(ReportGroup, 'after_insert', after_insert)
269 267 sa.event.listen(ReportGroup, 'after_update', after_update)
270 268 sa.event.listen(ReportGroup, 'after_delete', after_delete)
@@ -1,211 +1,211 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors
4 4 #
5 5 # Licensed under the Apache License, Version 2.0 (the "License");
6 6 # you may not use this file except in compliance with the License.
7 7 # You may obtain a copy of the License at
8 8 #
9 9 # http://www.apache.org/licenses/LICENSE-2.0
10 10 #
11 11 # Unless required by applicable law or agreed to in writing, software
12 12 # distributed under the License is distributed on an "AS IS" BASIS,
13 13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 14 # See the License for the specific language governing permissions and
15 15 # limitations under the License.
16 16
17 17 import paginate
18 18 import logging
19 19 import sqlalchemy as sa
20 20
21 21 from appenlight.models.log import Log
22 22 from appenlight.models import get_db_session, Datastores
23 23 from appenlight.models.services.base import BaseService
24 24 from appenlight.lib.utils import es_index_name_limiter
25 25
26 26 log = logging.getLogger(__name__)
27 27
28 28
29 29 class LogService(BaseService):
30 30 @classmethod
31 31 def get_logs(cls, resource_ids=None, filter_settings=None,
32 32 db_session=None):
33 33 # ensure we always have id's passed
34 34 if not resource_ids:
35 35 # raise Exception('No App ID passed')
36 36 return []
37 37 db_session = get_db_session(db_session)
38 38 q = db_session.query(Log)
39 39 q = q.filter(Log.resource_id.in_(resource_ids))
40 40 if filter_settings.get('start_date'):
41 41 q = q.filter(Log.timestamp >= filter_settings.get('start_date'))
42 42 if filter_settings.get('end_date'):
43 43 q = q.filter(Log.timestamp <= filter_settings.get('end_date'))
44 44 if filter_settings.get('log_level'):
45 45 q = q.filter(
46 46 Log.log_level == filter_settings.get('log_level').upper())
47 47 if filter_settings.get('request_id'):
48 48 request_id = filter_settings.get('request_id', '')
49 49 q = q.filter(Log.request_id == request_id.replace('-', ''))
50 50 if filter_settings.get('namespace'):
51 51 q = q.filter(Log.namespace == filter_settings.get('namespace'))
52 52 q = q.order_by(sa.desc(Log.timestamp))
53 53 return q
54 54
55 55 @classmethod
56 56 def es_query_builder(cls, app_ids, filter_settings):
57 57 if not filter_settings:
58 58 filter_settings = {}
59 59
60 60 query = {
61 61 "query": {
62 62 "filtered": {
63 63 "filter": {
64 64 "and": [{"terms": {"resource_id": list(app_ids)}}]
65 65 }
66 66 }
67 67 }
68 68 }
69 69
70 70 start_date = filter_settings.get('start_date')
71 71 end_date = filter_settings.get('end_date')
72 72 filter_part = query['query']['filtered']['filter']['and']
73 73
74 74 for tag in filter_settings.get('tags', []):
75 75 tag_values = [v.lower() for v in tag['value']]
76 76 key = "tags.%s.values" % tag['name'].replace('.', '_')
77 77 filter_part.append({"terms": {key: tag_values}})
78 78
79 79 date_range = {"range": {"timestamp": {}}}
80 80 if start_date:
81 81 date_range["range"]["timestamp"]["gte"] = start_date
82 82 if end_date:
83 83 date_range["range"]["timestamp"]["lte"] = end_date
84 84 if start_date or end_date:
85 85 filter_part.append(date_range)
86 86
87 87 levels = filter_settings.get('level')
88 88 if levels:
89 89 filter_part.append({"terms": {'log_level': levels}})
90 90 namespaces = filter_settings.get('namespace')
91 91 if namespaces:
92 92 filter_part.append({"terms": {'namespace': namespaces}})
93 93
94 94 request_ids = filter_settings.get('request_id')
95 95 if request_ids:
96 96 filter_part.append({"terms": {'request_id': request_ids}})
97 97
98 98 messages = filter_settings.get('message')
99 99 if messages:
100 100 query['query']['filtered']['query'] = {
101 101 'match': {
102 102 'message': {
103 103 'query': ' '.join(messages),
104 104 'operator': 'and'
105 105 }
106 106 }
107 107 }
108 108 return query
109 109
110 110 @classmethod
111 111 def get_time_series_aggregate(cls, app_ids=None, filter_settings=None):
112 112 if not app_ids:
113 113 return {}
114 114 es_query = cls.es_query_builder(app_ids, filter_settings)
115 115 es_query["aggs"] = {
116 116 "events_over_time": {
117 117 "date_histogram": {
118 118 "field": "timestamp",
119 119 "interval": "1h",
120 120 "min_doc_count": 0,
121 121 'extended_bounds': {
122 122 'max': filter_settings.get('end_date'),
123 123 'min': filter_settings.get('start_date')}
124 124 }
125 125 }
126 126 }
127 127 log.debug(es_query)
128 128 index_names = es_index_name_limiter(filter_settings.get('start_date'),
129 129 filter_settings.get('end_date'),
130 130 ixtypes=['logs'])
131 131 if index_names:
132 132 results = Datastores.es.search(
133 es_query, index=index_names, doc_type='log', size=0)
133 body=es_query, index=index_names, doc_type='log', size=0)
134 134 else:
135 135 results = []
136 136 return results
137 137
138 138 @classmethod
139 139 def get_search_iterator(cls, app_ids=None, page=1, items_per_page=50,
140 140 order_by=None, filter_settings=None, limit=None):
141 141 if not app_ids:
142 142 return {}, 0
143 143
144 144 es_query = cls.es_query_builder(app_ids, filter_settings)
145 145 sort_query = {
146 146 "sort": [
147 147 {"timestamp": {"order": "desc"}}
148 148 ]
149 149 }
150 150 es_query.update(sort_query)
151 151 log.debug(es_query)
152 152 es_from = (page - 1) * items_per_page
153 153 index_names = es_index_name_limiter(filter_settings.get('start_date'),
154 154 filter_settings.get('end_date'),
155 155 ixtypes=['logs'])
156 156 if not index_names:
157 157 return {}, 0
158 158
159 results = Datastores.es.search(es_query, index=index_names,
159 results = Datastores.es.search(body=es_query, index=index_names,
160 160 doc_type='log', size=items_per_page,
161 es_from=es_from)
161 from_=es_from)
162 162 if results['hits']['total'] > 5000:
163 163 count = 5000
164 164 else:
165 165 count = results['hits']['total']
166 166 return results['hits'], count
167 167
168 168 @classmethod
169 169 def get_paginator_by_app_ids(cls, app_ids=None, page=1, item_count=None,
170 170 items_per_page=50, order_by=None,
171 171 filter_settings=None,
172 172 exclude_columns=None, db_session=None):
173 173 if not filter_settings:
174 174 filter_settings = {}
175 175 results, item_count = cls.get_search_iterator(app_ids, page,
176 176 items_per_page, order_by,
177 177 filter_settings)
178 178 paginator = paginate.Page([],
179 179 item_count=item_count,
180 180 items_per_page=items_per_page,
181 181 **filter_settings)
182 182 ordered_ids = tuple(item['_source']['pg_id']
183 183 for item in results.get('hits', []))
184 184
185 185 sorted_instance_list = []
186 186 if ordered_ids:
187 187 db_session = get_db_session(db_session)
188 188 query = db_session.query(Log)
189 189 query = query.filter(Log.log_id.in_(ordered_ids))
190 190 query = query.order_by(sa.desc('timestamp'))
191 191 sa_items = query.all()
192 192 # resort by score
193 193 for i_id in ordered_ids:
194 194 for item in sa_items:
195 195 if str(item.log_id) == str(i_id):
196 196 sorted_instance_list.append(item)
197 197 paginator.sa_items = sorted_instance_list
198 198 return paginator
199 199
200 200 @classmethod
201 201 def query_by_primary_key_and_namespace(cls, list_of_pairs,
202 202 db_session=None):
203 203 db_session = get_db_session(db_session)
204 204 list_of_conditions = []
205 205 query = db_session.query(Log)
206 206 for pair in list_of_pairs:
207 207 list_of_conditions.append(sa.and_(
208 208 Log.primary_key == pair['pk'], Log.namespace == pair['ns']))
209 209 query = query.filter(sa.or_(*list_of_conditions))
210 210 query = query.order_by(sa.asc(Log.timestamp), sa.asc(Log.log_id))
211 211 return query
@@ -1,451 +1,451 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors
4 4 #
5 5 # Licensed under the Apache License, Version 2.0 (the "License");
6 6 # you may not use this file except in compliance with the License.
7 7 # You may obtain a copy of the License at
8 8 #
9 9 # http://www.apache.org/licenses/LICENSE-2.0
10 10 #
11 11 # Unless required by applicable law or agreed to in writing, software
12 12 # distributed under the License is distributed on an "AS IS" BASIS,
13 13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 14 # See the License for the specific language governing permissions and
15 15 # limitations under the License.
16 16
17 17 import logging
18 18 import paginate
19 19 import sqlalchemy as sa
20 20 import appenlight.lib.helpers as h
21 21
22 22 from datetime import datetime
23 23
24 24 from appenlight.models import get_db_session, Datastores
25 25 from appenlight.models.report import Report
26 26 from appenlight.models.report_group import ReportGroup
27 27 from appenlight.models.report_comment import ReportComment
28 28 from appenlight.models.user import User
29 29 from appenlight.models.services.base import BaseService
30 30 from appenlight.lib.enums import ReportType
31 31 from appenlight.lib.utils import es_index_name_limiter
32 32
33 33 log = logging.getLogger(__name__)
34 34
35 35
36 36 class ReportGroupService(BaseService):
37 37 @classmethod
38 38 def get_trending(cls, request, filter_settings, limit=15,
39 39 db_session=None):
40 40 """
41 41 Returns report groups trending for specific time interval
42 42 """
43 43 db_session = get_db_session(db_session)
44 44
45 45 tags = []
46 46 if filter_settings.get('tags'):
47 47 for tag in filter_settings['tags']:
48 48 tags.append(
49 49 {'terms': {
50 50 'tags.{}.values'.format(tag['name']): tag['value']}})
51 51
52 52 index_names = es_index_name_limiter(
53 53 start_date=filter_settings['start_date'],
54 54 end_date=filter_settings['end_date'],
55 55 ixtypes=['reports'])
56 56
57 57 if not index_names or not filter_settings['resource']:
58 58 return []
59 59
60 60 es_query = {
61 61 'aggs': {'parent_agg': {'aggs': {'groups': {'aggs': {
62 62 'sub_agg': {
63 63 'value_count': {'field': 'tags.group_id.values'}}},
64 64 'filter': {'exists': {'field': 'tags.group_id.values'}}}},
65 65 'terms': {'field': 'tags.group_id.values', 'size': limit}}},
66 66 'query': {'filtered': {
67 67 'filter': {'and': [
68 68 {'terms': {
69 69 'resource_id': [filter_settings['resource'][0]]}
70 70 },
71 71 {'range': {'timestamp': {
72 72 'gte': filter_settings['start_date'],
73 73 'lte': filter_settings['end_date']}}}]
74 74 }
75 75 }}
76 76 }
77 77 if tags:
78 78 es_query['query']['filtered']['filter']['and'].extend(tags)
79 79
80 80 result = Datastores.es.search(
81 es_query, index=index_names, doc_type='log', size=0)
81 body=es_query, index=index_names, doc_type='log', size=0)
82 82 series = []
83 83 for bucket in result['aggregations']['parent_agg']['buckets']:
84 84 series.append({
85 85 'key': bucket['key'],
86 86 'groups': bucket['groups']['sub_agg']['value']
87 87 })
88 88
89 89 report_groups_d = {}
90 90 for g in series:
91 91 report_groups_d[int(g['key'])] = g['groups'] or 0
92 92
93 93 query = db_session.query(ReportGroup)
94 94 query = query.filter(ReportGroup.id.in_(list(report_groups_d.keys())))
95 95 query = query.options(
96 96 sa.orm.joinedload(ReportGroup.last_report_ref))
97 97 results = [(report_groups_d[group.id], group,) for group in query]
98 98 return sorted(results, reverse=True, key=lambda x:x[0])
99 99
100 100 @classmethod
101 101 def get_search_iterator(cls, app_ids=None, page=1, items_per_page=50,
102 102 order_by=None, filter_settings=None, limit=None):
103 103 if not app_ids:
104 104 return {}
105 105 if not filter_settings:
106 106 filter_settings = {}
107 107
108 108 query = {
109 109 "size": 0,
110 110 "query": {
111 111 "filtered": {
112 112 "filter": {
113 113 "and": [{"terms": {"resource_id": list(app_ids)}}]
114 114 }
115 115 }
116 116 },
117 117
118 118 "aggs": {
119 119 "top_groups": {
120 120 "terms": {
121 121 "size": 5000,
122 122 "field": "_parent",
123 123 "order": {
124 124 "newest": "desc"
125 125 }
126 126 },
127 127 "aggs": {
128 128 "top_reports_hits": {
129 129 "top_hits": {"size": 1,
130 130 "sort": {"start_time": "desc"}
131 131 }
132 132 },
133 133 "newest": {
134 134 "max": {"field": "start_time"}
135 135 }
136 136 }
137 137 }
138 138 }
139 139 }
140 140
141 141 start_date = filter_settings.get('start_date')
142 142 end_date = filter_settings.get('end_date')
143 143 filter_part = query['query']['filtered']['filter']['and']
144 144 date_range = {"range": {"start_time": {}}}
145 145 if start_date:
146 146 date_range["range"]["start_time"]["gte"] = start_date
147 147 if end_date:
148 148 date_range["range"]["start_time"]["lte"] = end_date
149 149 if start_date or end_date:
150 150 filter_part.append(date_range)
151 151
152 152 priorities = filter_settings.get('priority')
153 153
154 154 for tag in filter_settings.get('tags', []):
155 155 tag_values = [v.lower() for v in tag['value']]
156 156 key = "tags.%s.values" % tag['name'].replace('.', '_')
157 157 filter_part.append({"terms": {key: tag_values}})
158 158
159 159 if priorities:
160 160 filter_part.append({"has_parent": {
161 161 "parent_type": "report_group",
162 162 "query": {
163 163 "terms": {'priority': priorities}
164 164 }}})
165 165
166 166 min_occurences = filter_settings.get('min_occurences')
167 167 if min_occurences:
168 168 filter_part.append({"has_parent": {
169 169 "parent_type": "report_group",
170 170 "query": {
171 171 "range": {'occurences': {"gte": min_occurences[0]}}
172 172 }}})
173 173
174 174 min_duration = filter_settings.get('min_duration')
175 175 max_duration = filter_settings.get('max_duration')
176 176
177 177 request_ids = filter_settings.get('request_id')
178 178 if request_ids:
179 179 filter_part.append({"terms": {'request_id': request_ids}})
180 180
181 181 duration_range = {"range": {"average_duration": {}}}
182 182 if min_duration:
183 183 duration_range["range"]["average_duration"]["gte"] = \
184 184 min_duration[0]
185 185 if max_duration:
186 186 duration_range["range"]["average_duration"]["lte"] = \
187 187 max_duration[0]
188 188 if min_duration or max_duration:
189 189 filter_part.append({"has_parent": {
190 190 "parent_type": "report_group",
191 191 "query": duration_range}})
192 192
193 193 http_status = filter_settings.get('http_status')
194 194 report_type = filter_settings.get('report_type', [ReportType.error])
195 195 # set error report type if http status is not found
196 196 # and we are dealing with slow reports
197 197 if not http_status or ReportType.slow in report_type:
198 198 filter_part.append({"terms": {'report_type': report_type}})
199 199 if http_status:
200 200 filter_part.append({"terms": {'http_status': http_status}})
201 201
202 202 messages = filter_settings.get('message')
203 203 if messages:
204 204 condition = {'match': {"message": ' '.join(messages)}}
205 205 query['query']['filtered']['query'] = condition
206 206 errors = filter_settings.get('error')
207 207 if errors:
208 208 condition = {'match': {"error": ' '.join(errors)}}
209 209 query['query']['filtered']['query'] = condition
210 210 url_domains = filter_settings.get('url_domain')
211 211 if url_domains:
212 212 condition = {'terms': {"url_domain": url_domains}}
213 213 query['query']['filtered']['query'] = condition
214 214 url_paths = filter_settings.get('url_path')
215 215 if url_paths:
216 216 condition = {'terms': {"url_path": url_paths}}
217 217 query['query']['filtered']['query'] = condition
218 218
219 219 if filter_settings.get('report_status'):
220 220 for status in filter_settings.get('report_status'):
221 221 if status == 'never_reviewed':
222 222 filter_part.append({"has_parent": {
223 223 "parent_type": "report_group",
224 224 "query": {
225 225 "term": {"read": False}
226 226 }}})
227 227 elif status == 'reviewed':
228 228 filter_part.append({"has_parent": {
229 229 "parent_type": "report_group",
230 230 "query": {
231 231 "term": {"read": True}
232 232 }}})
233 233 elif status == 'public':
234 234 filter_part.append({"has_parent": {
235 235 "parent_type": "report_group",
236 236 "query": {
237 237 "term": {"public": True}
238 238 }}})
239 239 elif status == 'fixed':
240 240 filter_part.append({"has_parent": {
241 241 "parent_type": "report_group",
242 242 "query": {
243 243 "term": {"fixed": True}
244 244 }}})
245 245
246 246 # logging.getLogger('pyelasticsearch').setLevel(logging.DEBUG)
247 247 index_names = es_index_name_limiter(filter_settings.get('start_date'),
248 248 filter_settings.get('end_date'),
249 249 ixtypes=['reports'])
250 250 if index_names:
251 251 results = Datastores.es.search(
252 query, index=index_names, doc_type=["report", "report_group"],
252 body=query, index=index_names, doc_type=["report", "report_group"],
253 253 size=0)
254 254 else:
255 255 return []
256 256 return results['aggregations']
257 257
258 258 @classmethod
259 259 def get_paginator_by_app_ids(cls, app_ids=None, page=1, item_count=None,
260 260 items_per_page=50, order_by=None,
261 261 filter_settings=None,
262 262 exclude_columns=None, db_session=None):
263 263 if not filter_settings:
264 264 filter_settings = {}
265 265 results = cls.get_search_iterator(app_ids, page, items_per_page,
266 266 order_by, filter_settings)
267 267
268 268 ordered_ids = []
269 269 if results:
270 270 for item in results['top_groups']['buckets']:
271 271 pg_id = item['top_reports_hits']['hits']['hits'][0]['_source'][
272 272 'pg_id']
273 273 ordered_ids.append(pg_id)
274 274 log.info(filter_settings)
275 275 paginator = paginate.Page(ordered_ids, items_per_page=items_per_page,
276 276 **filter_settings)
277 277 sa_items = ()
278 278 if paginator.items:
279 279 db_session = get_db_session(db_session)
280 280 # latest report detail
281 281 query = db_session.query(Report)
282 282 query = query.options(sa.orm.joinedload(Report.report_group))
283 283 query = query.filter(Report.id.in_(paginator.items))
284 284 if filter_settings.get('order_col'):
285 285 order_col = filter_settings.get('order_col')
286 286 if filter_settings.get('order_dir') == 'dsc':
287 287 sort_on = 'desc'
288 288 else:
289 289 sort_on = 'asc'
290 290 if order_col == 'when':
291 291 order_col = 'last_timestamp'
292 292 query = query.order_by(getattr(sa, sort_on)(
293 293 getattr(ReportGroup, order_col)))
294 294 sa_items = query.all()
295 295 sorted_instance_list = []
296 296 for i_id in ordered_ids:
297 297 for report in sa_items:
298 298 if (str(report.id) == i_id and
299 299 report not in sorted_instance_list):
300 300 sorted_instance_list.append(report)
301 301 paginator.sa_items = sorted_instance_list
302 302 return paginator
303 303
304 304 @classmethod
305 305 def by_app_ids(cls, app_ids=None, order_by=True, db_session=None):
306 306 db_session = get_db_session(db_session)
307 307 q = db_session.query(ReportGroup)
308 308 if app_ids:
309 309 q = q.filter(ReportGroup.resource_id.in_(app_ids))
310 310 if order_by:
311 311 q = q.order_by(sa.desc(ReportGroup.id))
312 312 return q
313 313
314 314 @classmethod
315 315 def by_id(cls, group_id, app_ids=None, db_session=None):
316 316 db_session = get_db_session(db_session)
317 317 q = db_session.query(ReportGroup).filter(
318 318 ReportGroup.id == int(group_id))
319 319 if app_ids:
320 320 q = q.filter(ReportGroup.resource_id.in_(app_ids))
321 321 return q.first()
322 322
323 323 @classmethod
324 324 def by_ids(cls, group_ids=None, db_session=None):
325 325 db_session = get_db_session(db_session)
326 326 query = db_session.query(ReportGroup)
327 327 query = query.filter(ReportGroup.id.in_(group_ids))
328 328 return query
329 329
330 330 @classmethod
331 331 def by_hash_and_resource(cls, resource_id, grouping_hash, since_when=None,
332 332 db_session=None):
333 333 db_session = get_db_session(db_session)
334 334 q = db_session.query(ReportGroup)
335 335 q = q.filter(ReportGroup.resource_id == resource_id)
336 336 q = q.filter(ReportGroup.grouping_hash == grouping_hash)
337 337 q = q.filter(ReportGroup.fixed == False)
338 338 if since_when:
339 339 q = q.filter(ReportGroup.first_timestamp >= since_when)
340 340 return q.first()
341 341
342 342 @classmethod
343 343 def users_commenting(cls, report_group, exclude_user_id=None,
344 344 db_session=None):
345 345 db_session = get_db_session(None, report_group)
346 346 query = db_session.query(User).distinct()
347 347 query = query.filter(User.id == ReportComment.owner_id)
348 348 query = query.filter(ReportComment.group_id == report_group.id)
349 349 if exclude_user_id:
350 350 query = query.filter(ReportComment.owner_id != exclude_user_id)
351 351 return query
352 352
353 353 @classmethod
354 354 def affected_users_count(cls, report_group, db_session=None):
355 355 db_session = get_db_session(db_session)
356 356 query = db_session.query(sa.func.count(Report.username))
357 357 query = query.filter(Report.group_id == report_group.id)
358 358 query = query.filter(Report.username != '')
359 359 query = query.filter(Report.username != None)
360 360 query = query.group_by(Report.username)
361 361 return query.count()
362 362
363 363 @classmethod
364 364 def top_affected_users(cls, report_group, db_session=None):
365 365 db_session = get_db_session(db_session)
366 366 count_label = sa.func.count(Report.username).label('count')
367 367 query = db_session.query(Report.username, count_label)
368 368 query = query.filter(Report.group_id == report_group.id)
369 369 query = query.filter(Report.username != None)
370 370 query = query.filter(Report.username != '')
371 371 query = query.group_by(Report.username)
372 372 query = query.order_by(sa.desc(count_label))
373 373 query = query.limit(50)
374 374 return query
375 375
376 376 @classmethod
377 377 def get_report_stats(cls, request, filter_settings):
378 378 """
379 379 Gets report dashboard graphs
380 380 Returns information for BAR charts with occurences/interval information
381 381 detailed means version that returns time intervals - non detailed
382 382 returns total sum
383 383 """
384 384 delta = filter_settings['end_date'] - filter_settings['start_date']
385 385 if delta < h.time_deltas.get('12h')['delta']:
386 386 interval = '1m'
387 387 elif delta <= h.time_deltas.get('3d')['delta']:
388 388 interval = '5m'
389 389 elif delta >= h.time_deltas.get('2w')['delta']:
390 390 interval = '24h'
391 391 else:
392 392 interval = '1h'
393 393
394 394 group_id = filter_settings.get('group_id')
395 395
396 396 es_query = {
397 397 'aggs': {'parent_agg': {'aggs': {'types': {
398 398 'aggs': {'sub_agg': {'terms': {'field': 'tags.type.values'}}},
399 399 'filter': {
400 400 'and': [{'exists': {'field': 'tags.type.values'}}]}
401 401 }},
402 402 'date_histogram': {'extended_bounds': {
403 403 'max': filter_settings['end_date'],
404 404 'min': filter_settings['start_date']},
405 405 'field': 'timestamp',
406 406 'interval': interval,
407 407 'min_doc_count': 0}}},
408 408 'query': {'filtered': {
409 409 'filter': {'and': [
410 410 {'terms': {
411 411 'resource_id': [filter_settings['resource'][0]]}},
412 412 {'range': {'timestamp': {
413 413 'gte': filter_settings['start_date'],
414 414 'lte': filter_settings['end_date']}}}]
415 415 }
416 416 }}
417 417 }
418 418 if group_id:
419 419 parent_agg = es_query['aggs']['parent_agg']
420 420 filters = parent_agg['aggs']['types']['filter']['and']
421 421 filters.append({'terms': {'tags.group_id.values': [group_id]}})
422 422
423 423 index_names = es_index_name_limiter(
424 424 start_date=filter_settings['start_date'],
425 425 end_date=filter_settings['end_date'],
426 426 ixtypes=['reports'])
427 427
428 428 if not index_names:
429 429 return []
430 430
431 result = Datastores.es.search(es_query,
431 result = Datastores.es.search(body=es_query,
432 432 index=index_names,
433 433 doc_type='log',
434 434 size=0)
435 435 series = []
436 436 for bucket in result['aggregations']['parent_agg']['buckets']:
437 437 point = {
438 438 'x': datetime.utcfromtimestamp(int(bucket['key']) / 1000),
439 439 'report': 0,
440 440 'not_found': 0,
441 441 'slow_report': 0
442 442 }
443 443 for subbucket in bucket['types']['sub_agg']['buckets']:
444 444 if subbucket['key'] == 'slow':
445 445 point['slow_report'] = subbucket['doc_count']
446 446 elif subbucket['key'] == 'error':
447 447 point['report'] = subbucket['doc_count']
448 448 elif subbucket['key'] == 'not_found':
449 449 point['not_found'] = subbucket['doc_count']
450 450 series.append(point)
451 451 return series
@@ -1,50 +1,50 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors
4 4 #
5 5 # Licensed under the Apache License, Version 2.0 (the "License");
6 6 # you may not use this file except in compliance with the License.
7 7 # You may obtain a copy of the License at
8 8 #
9 9 # http://www.apache.org/licenses/LICENSE-2.0
10 10 #
11 11 # Unless required by applicable law or agreed to in writing, software
12 12 # distributed under the License is distributed on an "AS IS" BASIS,
13 13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 14 # See the License for the specific language governing permissions and
15 15 # limitations under the License.
16 16
17 17 from appenlight.models import Datastores
18 18 from appenlight.models.services.base import BaseService
19 19 from appenlight.lib.enums import ReportType
20 20 from appenlight.lib.utils import es_index_name_limiter
21 21
22 22
23 23 class ReportStatService(BaseService):
24 24 @classmethod
25 25 def count_by_type(cls, report_type, resource_id, since_when):
26 26 report_type = ReportType.key_from_value(report_type)
27 27
28 28 index_names = es_index_name_limiter(start_date=since_when,
29 29 ixtypes=['reports'])
30 30
31 31 es_query = {
32 32 'aggs': {'reports': {'aggs': {
33 33 'sub_agg': {'value_count': {'field': 'tags.group_id.values'}}},
34 34 'filter': {'and': [{'terms': {'resource_id': [resource_id]}},
35 35 {'exists': {
36 36 'field': 'tags.group_id.values'}}]}}},
37 37 'query': {'filtered': {'filter': {
38 38 'and': [{'terms': {'resource_id': [resource_id]}},
39 39 {'terms': {'tags.type.values': [report_type]}},
40 40 {'range': {'timestamp': {
41 41 'gte': since_when}}}]}}}}
42 42
43 43 if index_names:
44 result = Datastores.es.search(es_query,
44 result = Datastores.es.search(body=es_query,
45 45 index=index_names,
46 46 doc_type='log',
47 47 size=0)
48 48 return result['aggregations']['reports']['sub_agg']['value']
49 49 else:
50 50 return 0
@@ -1,443 +1,443 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors
4 4 #
5 5 # Licensed under the Apache License, Version 2.0 (the "License");
6 6 # you may not use this file except in compliance with the License.
7 7 # You may obtain a copy of the License at
8 8 #
9 9 # http://www.apache.org/licenses/LICENSE-2.0
10 10 #
11 11 # Unless required by applicable law or agreed to in writing, software
12 12 # distributed under the License is distributed on an "AS IS" BASIS,
13 13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 14 # See the License for the specific language governing permissions and
15 15 # limitations under the License.
16 16
17 17 from datetime import datetime
18 18
19 19 import appenlight.lib.helpers as h
20 20 from appenlight.models import get_db_session, Datastores
21 21 from appenlight.models.services.base import BaseService
22 22 from appenlight.lib.enums import ReportType
23 23 from appenlight.lib.utils import es_index_name_limiter
24 24
25 25 try:
26 26 from ae_uptime_ce.models.services.uptime_metric import \
27 27 UptimeMetricService
28 28 except ImportError:
29 29 UptimeMetricService = None
30 30
31 31
32 32 def check_key(key, stats, uptime, total_seconds):
33 33 if key not in stats:
34 34 stats[key] = {'name': key,
35 35 'requests': 0,
36 36 'errors': 0,
37 37 'tolerated_requests': 0,
38 38 'frustrating_requests': 0,
39 39 'satisfying_requests': 0,
40 40 'total_minutes': total_seconds / 60.0,
41 41 'uptime': uptime,
42 42 'apdex': 0,
43 43 'rpm': 0,
44 44 'response_time': 0,
45 45 'avg_response_time': 0}
46 46
47 47
48 48 class RequestMetricService(BaseService):
49 49 @classmethod
50 50 def get_metrics_stats(cls, request, filter_settings, db_session=None):
51 51 delta = filter_settings['end_date'] - filter_settings['start_date']
52 52 if delta < h.time_deltas.get('12h')['delta']:
53 53 interval = '1m'
54 54 elif delta <= h.time_deltas.get('3d')['delta']:
55 55 interval = '5m'
56 56 elif delta >= h.time_deltas.get('2w')['delta']:
57 57 interval = '24h'
58 58 else:
59 59 interval = '1h'
60 60
61 61 filter_settings['namespace'] = ['appenlight.request_metric']
62 62
63 63 es_query = {
64 64 'aggs': {
65 65 'parent_agg': {
66 66 'aggs': {'custom': {'aggs': {'sub_agg': {
67 67 'sum': {'field': 'tags.custom.numeric_values'}}},
68 68 'filter': {'exists': {
69 69 'field': 'tags.custom.numeric_values'}}},
70 70 'main': {'aggs': {'sub_agg': {'sum': {
71 71 'field': 'tags.main.numeric_values'}}},
72 72 'filter': {'exists': {
73 73 'field': 'tags.main.numeric_values'}}},
74 74 'nosql': {'aggs': {'sub_agg': {'sum': {
75 75 'field': 'tags.nosql.numeric_values'}}},
76 76 'filter': {'exists': {
77 77 'field': 'tags.nosql.numeric_values'}}},
78 78 'remote': {'aggs': {'sub_agg': {'sum': {
79 79 'field': 'tags.remote.numeric_values'}}},
80 80 'filter': {'exists': {
81 81 'field': 'tags.remote.numeric_values'}}},
82 82 'requests': {'aggs': {'sub_agg': {'sum': {
83 83 'field': 'tags.requests.numeric_values'}}},
84 84 'filter': {'exists': {
85 85 'field': 'tags.requests.numeric_values'}}},
86 86 'sql': {'aggs': {'sub_agg': {
87 87 'sum': {'field': 'tags.sql.numeric_values'}}},
88 88 'filter': {'exists': {
89 89 'field': 'tags.sql.numeric_values'}}},
90 90 'tmpl': {'aggs': {'sub_agg': {'sum': {
91 91 'field': 'tags.tmpl.numeric_values'}}},
92 92 'filter': {'exists': {
93 93 'field': 'tags.tmpl.numeric_values'}}}},
94 94 'date_histogram': {'extended_bounds': {
95 95 'max': filter_settings['end_date'],
96 96 'min': filter_settings['start_date']},
97 97 'field': 'timestamp',
98 98 'interval': interval,
99 99 'min_doc_count': 0}}},
100 100 'query': {'filtered': {
101 101 'filter': {'and': [{'terms': {
102 102 'resource_id': [filter_settings['resource'][0]]}},
103 103 {'range': {'timestamp': {
104 104 'gte': filter_settings['start_date'],
105 105 'lte': filter_settings['end_date']}}},
106 106 {'terms': {'namespace': [
107 107 'appenlight.request_metric']}}]}}}}
108 108
109 109 index_names = es_index_name_limiter(
110 110 start_date=filter_settings['start_date'],
111 111 end_date=filter_settings['end_date'],
112 112 ixtypes=['metrics'])
113 113 if not index_names:
114 114 return []
115 115
116 result = Datastores.es.search(es_query,
116 result = Datastores.es.search(body=es_query,
117 117 index=index_names,
118 118 doc_type='log',
119 119 size=0)
120 120
121 121 plot_data = []
122 122 for item in result['aggregations']['parent_agg']['buckets']:
123 123 x_time = datetime.utcfromtimestamp(int(item['key']) / 1000)
124 124 point = {"x": x_time}
125 125 for key in ['custom', 'main', 'nosql', 'remote',
126 126 'requests', 'sql', 'tmpl']:
127 127 value = item[key]['sub_agg']['value']
128 128 point[key] = round(value, 3) if value else 0
129 129 plot_data.append(point)
130 130
131 131 return plot_data
132 132
133 133 @classmethod
134 134 def get_requests_breakdown(cls, request, filter_settings,
135 135 db_session=None):
136 136 db_session = get_db_session(db_session)
137 137
138 138 # fetch total time of all requests in this time range
139 139 index_names = es_index_name_limiter(
140 140 start_date=filter_settings['start_date'],
141 141 end_date=filter_settings['end_date'],
142 142 ixtypes=['metrics'])
143 143
144 144 if index_names and filter_settings['resource']:
145 145 es_query = {
146 146 'aggs': {'main': {'aggs': {
147 147 'sub_agg': {'sum': {'field': 'tags.main.numeric_values'}}},
148 148 'filter': {'exists': {
149 149 'field': 'tags.main.numeric_values'}}}},
150 150 'query': {'filtered': {
151 151 'filter': {'and': [
152 152 {'terms': {
153 153 'resource_id': [filter_settings['resource'][0]]}},
154 154 {'range': {'timestamp': {
155 155 'gte': filter_settings['start_date'],
156 156 'lte': filter_settings['end_date']}}},
157 157 {'terms': {'namespace': [
158 158 'appenlight.request_metric']}}]}}}}
159 result = Datastores.es.search(es_query,
159 result = Datastores.es.search(body=es_query,
160 160 index=index_names,
161 161 doc_type='log',
162 162 size=0)
163 163 total_time_spent = result['aggregations']['main']['sub_agg'][
164 164 'value']
165 165 else:
166 166 total_time_spent = 0
167 167 script_text = "doc['tags.main.numeric_values'].value / {}".format(
168 168 total_time_spent)
169 169
170 170 if index_names and filter_settings['resource']:
171 171 es_query = {
172 172 'aggs': {
173 173 'parent_agg': {
174 174 'aggs': {'main': {'aggs': {
175 175 'sub_agg': {
176 176 'sum': {'field': 'tags.main.numeric_values'}}},
177 177 'filter': {
178 178 'exists': {
179 179 'field': 'tags.main.numeric_values'}}},
180 180 'percentage': {
181 181 'aggs': {'sub_agg': {
182 182 'sum': {
183 183 'lang': 'expression',
184 184 'script': script_text}}},
185 185 'filter': {
186 186 'exists': {
187 187 'field': 'tags.main.numeric_values'}}},
188 188 'requests': {'aggs': {'sub_agg': {
189 189 'sum': {
190 190 'field': 'tags.requests.numeric_values'}}},
191 191 'filter': {'exists': {
192 192 'field': 'tags.requests.numeric_values'}}}},
193 193 'terms': {'field': 'tags.view_name.values',
194 194 'order': {'percentage>sub_agg': 'desc'},
195 195 'size': 15}}},
196 196 'query': {'filtered': {'filter': {'and': [
197 197 {'terms': {
198 198 'resource_id': [filter_settings['resource'][0]]}},
199 199 {'range': {
200 200 'timestamp': {'gte': filter_settings['start_date'],
201 201 'lte': filter_settings['end_date']
202 202 }
203 203 }
204 204 }
205 205 ]}
206 206 }}
207 207 }
208 result = Datastores.es.search(es_query,
208 result = Datastores.es.search(body=es_query,
209 209 index=index_names,
210 210 doc_type='log',
211 211 size=0)
212 212 series = result['aggregations']['parent_agg']['buckets']
213 213 else:
214 214 series = []
215 215
216 216 and_part = [
217 217 {"term": {"resource_id": filter_settings['resource'][0]}},
218 218 {"terms": {"tags.view_name.values": [row['key'] for
219 219 row in series]}},
220 220 {"term": {"report_type": str(ReportType.slow)}}
221 221 ]
222 222 query = {
223 223 "aggs": {
224 224 "top_reports": {
225 225 "terms": {
226 226 "field": "tags.view_name.values",
227 227 "size": len(series)
228 228 },
229 229 "aggs": {
230 230 "top_calls_hits": {
231 231 "top_hits": {
232 232 "sort": {"start_time": "desc"},
233 233 "size": 5
234 234 }
235 235 }
236 236 }
237 237 }
238 238 },
239 239
240 240 "query": {
241 241 "filtered": {
242 242 "filter": {
243 243 "and": and_part
244 244 }
245 245 }
246 246 }
247 247 }
248 248 details = {}
249 249 index_names = es_index_name_limiter(ixtypes=['reports'])
250 250 if index_names and series:
251 251 result = Datastores.es.search(
252 query, doc_type='report', size=0, index=index_names)
252 body=query, doc_type='report', size=0, index=index_names)
253 253 for bucket in result['aggregations']['top_reports']['buckets']:
254 254 details[bucket['key']] = []
255 255
256 256 for hit in bucket['top_calls_hits']['hits']['hits']:
257 257 details[bucket['key']].append(
258 258 {'report_id': hit['_source']['pg_id'],
259 259 'group_id': hit['_source']['group_id']}
260 260 )
261 261
262 262 results = []
263 263 for row in series:
264 264 result = {
265 265 'key': row['key'],
266 266 'main': row['main']['sub_agg']['value'],
267 267 'requests': row['requests']['sub_agg']['value']
268 268 }
269 269 # es can return 'infinity'
270 270 try:
271 271 result['percentage'] = float(
272 272 row['percentage']['sub_agg']['value'])
273 273 except ValueError:
274 274 result['percentage'] = 0
275 275
276 276 result['latest_details'] = details.get(row['key']) or []
277 277 results.append(result)
278 278
279 279 return results
280 280
281 281 @classmethod
282 282 def get_apdex_stats(cls, request, filter_settings,
283 283 threshold=1, db_session=None):
284 284 """
285 285 Returns information and calculates APDEX score per server for dashboard
286 286 server information (upper right stats boxes)
287 287 """
288 288 # Apdex t = (Satisfied Count + Tolerated Count / 2) / Total Samples
289 289 db_session = get_db_session(db_session)
290 290 index_names = es_index_name_limiter(
291 291 start_date=filter_settings['start_date'],
292 292 end_date=filter_settings['end_date'], ixtypes=['metrics'])
293 293
294 294 requests_series = []
295 295
296 296 if index_names and filter_settings['resource']:
297 297 es_query = {
298 298 'aggs': {
299 299 'parent_agg': {'aggs': {
300 300 'frustrating': {'aggs': {'sub_agg': {
301 301 'sum': {'field': 'tags.requests.numeric_values'}}},
302 302 'filter': {'and': [
303 303 {'range': {
304 304 'tags.main.numeric_values': {'gte': '4'}}},
305 305 {'exists': {
306 306 'field': 'tags.requests.numeric_values'}}]
307 307 }
308 308 },
309 309 'main': {'aggs': {'sub_agg': {'sum': {
310 310 'field': 'tags.main.numeric_values'}}},
311 311 'filter': {'exists': {
312 312 'field': 'tags.main.numeric_values'}}},
313 313 'requests': {'aggs': {'sub_agg': {
314 314 'sum': {
315 315 'field': 'tags.requests.numeric_values'}}},
316 316 'filter': {'exists': {
317 317 'field': 'tags.requests.numeric_values'}}},
318 318 'tolerated': {'aggs': {'sub_agg': {
319 319 'sum': {
320 320 'field': 'tags.requests.numeric_values'}}},
321 321 'filter': {'and': [
322 322 {'range': {
323 323 'tags.main.numeric_values': {'gte': '1'}}},
324 324 {'range': {
325 325 'tags.main.numeric_values': {'lt': '4'}}},
326 326 {'exists': {
327 327 'field': 'tags.requests.numeric_values'}}]}
328 328 }
329 329 },
330 330 'terms': {'field': 'tags.server_name.values',
331 331 'size': 999999}}},
332 332 'query': {
333 333 'filtered': {
334 334 'filter': {'and': [{'terms': {
335 335 'resource_id': [
336 336 filter_settings['resource'][0]]}},
337 337 {'range': {'timestamp': {
338 338 'gte': filter_settings['start_date'],
339 339 'lte': filter_settings['end_date']}}},
340 340 {'terms': {'namespace': [
341 341 'appenlight.request_metric']}}]}}}}
342 342
343 result = Datastores.es.search(es_query,
343 result = Datastores.es.search(body=es_query,
344 344 index=index_names,
345 345 doc_type='log',
346 346 size=0)
347 347 for bucket in result['aggregations']['parent_agg']['buckets']:
348 348 requests_series.append({
349 349 'frustrating': bucket['frustrating']['sub_agg']['value'],
350 350 'main': bucket['main']['sub_agg']['value'],
351 351 'requests': bucket['requests']['sub_agg']['value'],
352 352 'tolerated': bucket['tolerated']['sub_agg']['value'],
353 353 'key': bucket['key']
354 354 })
355 355
356 356 since_when = filter_settings['start_date']
357 357 until = filter_settings['end_date']
358 358
359 359 # total errors
360 360
361 361 index_names = es_index_name_limiter(
362 362 start_date=filter_settings['start_date'],
363 363 end_date=filter_settings['end_date'], ixtypes=['reports'])
364 364
365 365 report_series = []
366 366 if index_names and filter_settings['resource']:
367 367 report_type = ReportType.key_from_value(ReportType.error)
368 368 es_query = {
369 369 'aggs': {
370 370 'parent_agg': {'aggs': {'errors': {'aggs': {'sub_agg': {
371 371 'sum': {
372 372 'field': 'tags.occurences.numeric_values'}}},
373 373 'filter': {'and': [
374 374 {'terms': {
375 375 'tags.type.values': [report_type]}},
376 376 {'exists': {
377 377 'field': 'tags.occurences.numeric_values'}}]
378 378 }
379 379 }},
380 380 'terms': {'field': 'tags.server_name.values',
381 381 'size': 999999}}},
382 382 'query': {'filtered': {
383 383 'filter': {'and': [
384 384 {'terms': {
385 385 'resource_id': [filter_settings['resource'][0]]}},
386 386 {'range': {
387 387 'timestamp': {'gte': filter_settings['start_date'],
388 388 'lte': filter_settings['end_date']}}
389 389 },
390 390 {'terms': {'namespace': ['appenlight.error']}}]
391 391 }
392 392 }}
393 393 }
394 result = Datastores.es.search(es_query,
394 result = Datastores.es.search(body=es_query,
395 395 index=index_names,
396 396 doc_type='log',
397 397 size=0)
398 398 for bucket in result['aggregations']['parent_agg']['buckets']:
399 399 report_series.append(
400 400 {'key': bucket['key'],
401 401 'errors': bucket['errors']['sub_agg']['value']
402 402 }
403 403 )
404 404
405 405 stats = {}
406 406 if UptimeMetricService is not None:
407 407 uptime = UptimeMetricService.get_uptime_by_app(
408 408 filter_settings['resource'][0],
409 409 since_when=since_when, until=until)
410 410 else:
411 411 uptime = 0
412 412
413 413 total_seconds = (until - since_when).total_seconds()
414 414
415 415 for stat in requests_series:
416 416 check_key(stat['key'], stats, uptime, total_seconds)
417 417 stats[stat['key']]['requests'] = int(stat['requests'])
418 418 stats[stat['key']]['response_time'] = stat['main']
419 419 stats[stat['key']]['tolerated_requests'] = stat['tolerated']
420 420 stats[stat['key']]['frustrating_requests'] = stat['frustrating']
421 421 for server in report_series:
422 422 check_key(server['key'], stats, uptime, total_seconds)
423 423 stats[server['key']]['errors'] = server['errors']
424 424
425 425 server_stats = list(stats.values())
426 426 for stat in server_stats:
427 427 stat['satisfying_requests'] = stat['requests'] - stat['errors'] \
428 428 - stat['frustrating_requests'] - \
429 429 stat['tolerated_requests']
430 430 if stat['satisfying_requests'] < 0:
431 431 stat['satisfying_requests'] = 0
432 432
433 433 if stat['requests']:
434 434 stat['avg_response_time'] = round(stat['response_time'] /
435 435 stat['requests'], 3)
436 436 qual_requests = stat['satisfying_requests'] + \
437 437 stat['tolerated_requests'] / 2.0
438 438 stat['apdex'] = round((qual_requests / stat['requests']) * 100,
439 439 2)
440 440 stat['rpm'] = round(stat['requests'] / stat['total_minutes'],
441 441 2)
442 442
443 443 return sorted(server_stats, key=lambda x: x['name'])
@@ -1,169 +1,169 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors
4 4 #
5 5 # Licensed under the Apache License, Version 2.0 (the "License");
6 6 # you may not use this file except in compliance with the License.
7 7 # You may obtain a copy of the License at
8 8 #
9 9 # http://www.apache.org/licenses/LICENSE-2.0
10 10 #
11 11 # Unless required by applicable law or agreed to in writing, software
12 12 # distributed under the License is distributed on an "AS IS" BASIS,
13 13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 14 # See the License for the specific language governing permissions and
15 15 # limitations under the License.
16 16
17 17 from appenlight.models import get_db_session, Datastores
18 18 from appenlight.models.report import Report
19 19 from appenlight.models.services.base import BaseService
20 20 from appenlight.lib.utils import es_index_name_limiter
21 21
22 22
23 23 class SlowCallService(BaseService):
24 24 @classmethod
25 25 def get_time_consuming_calls(cls, request, filter_settings,
26 26 db_session=None):
27 27 db_session = get_db_session(db_session)
28 28 # get slow calls from older partitions too
29 29 index_names = es_index_name_limiter(
30 30 start_date=filter_settings['start_date'],
31 31 end_date=filter_settings['end_date'],
32 32 ixtypes=['slow_calls'])
33 33 if index_names and filter_settings['resource']:
34 34 # get longest time taking hashes
35 35 es_query = {
36 36 'aggs': {
37 37 'parent_agg': {
38 38 'aggs': {
39 39 'duration': {
40 40 'aggs': {'sub_agg': {
41 41 'sum': {
42 42 'field': 'tags.duration.numeric_values'}
43 43 }},
44 44 'filter': {'exists': {
45 45 'field': 'tags.duration.numeric_values'}}},
46 46 'total': {
47 47 'aggs': {'sub_agg': {'value_count': {
48 48 'field': 'tags.statement_hash.values'}}},
49 49 'filter': {'exists': {
50 50 'field': 'tags.statement_hash.values'}}}},
51 51 'terms': {'field': 'tags.statement_hash.values',
52 52 'order': {'duration>sub_agg': 'desc'},
53 53 'size': 15}}},
54 54 'query': {'filtered': {
55 55 'filter': {'and': [
56 56 {'terms': {
57 57 'resource_id': [filter_settings['resource'][0]]
58 58 }},
59 59 {'range': {'timestamp': {
60 60 'gte': filter_settings['start_date'],
61 61 'lte': filter_settings['end_date']}
62 62 }}]
63 63 }
64 64 }
65 65 }
66 66 }
67 67 result = Datastores.es.search(
68 es_query, index=index_names, doc_type='log', size=0)
68 body=es_query, index=index_names, doc_type='log', size=0)
69 69 results = result['aggregations']['parent_agg']['buckets']
70 70 else:
71 71 return []
72 72 hashes = [i['key'] for i in results]
73 73
74 74 # get queries associated with hashes
75 75 calls_query = {
76 76 "aggs": {
77 77 "top_calls": {
78 78 "terms": {
79 79 "field": "tags.statement_hash.values",
80 80 "size": 15
81 81 },
82 82 "aggs": {
83 83 "top_calls_hits": {
84 84 "top_hits": {
85 85 "sort": {"timestamp": "desc"},
86 86 "size": 5
87 87 }
88 88 }
89 89 }
90 90 }
91 91 },
92 92 "query": {
93 93 "filtered": {
94 94 "filter": {
95 95 "and": [
96 96 {
97 97 "terms": {
98 98 "resource_id": [
99 99 filter_settings['resource'][0]
100 100 ]
101 101 }
102 102 },
103 103 {
104 104 "terms": {
105 105 "tags.statement_hash.values": hashes
106 106 }
107 107 },
108 108 {
109 109 "range": {
110 110 "timestamp": {
111 111 "gte": filter_settings['start_date'],
112 112 "lte": filter_settings['end_date']
113 113 }
114 114 }
115 115 }
116 116 ]
117 117 }
118 118 }
119 119 }
120 120 }
121 calls = Datastores.es.search(calls_query,
121 calls = Datastores.es.search(body=calls_query,
122 122 index=index_names,
123 123 doc_type='log',
124 124 size=0)
125 125 call_results = {}
126 126 report_ids = []
127 127 for call in calls['aggregations']['top_calls']['buckets']:
128 128 hits = call['top_calls_hits']['hits']['hits']
129 129 call_results[call['key']] = [i['_source'] for i in hits]
130 130 report_ids.extend([i['_source']['tags']['report_id']['values']
131 131 for i in hits])
132 132 if report_ids:
133 133 r_query = db_session.query(Report.group_id, Report.id)
134 134 r_query = r_query.filter(Report.id.in_(report_ids))
135 135 r_query = r_query.filter(
136 136 Report.start_time >= filter_settings['start_date'])
137 137 else:
138 138 r_query = []
139 139 reports_reversed = {}
140 140 for report in r_query:
141 141 reports_reversed[report.id] = report.group_id
142 142
143 143 final_results = []
144 144 for item in results:
145 145 if item['key'] not in call_results:
146 146 continue
147 147 call = call_results[item['key']][0]
148 148 row = {'occurences': item['total']['sub_agg']['value'],
149 149 'total_duration': round(
150 150 item['duration']['sub_agg']['value']),
151 151 'statement': call['message'],
152 152 'statement_type': call['tags']['type']['values'],
153 153 'statement_subtype': call['tags']['subtype']['values'],
154 154 'statement_hash': item['key'],
155 155 'latest_details': []}
156 156 if row['statement_type'] in ['tmpl', ' remote']:
157 157 params = call['tags']['parameters']['values'] \
158 158 if 'parameters' in call['tags'] else ''
159 159 row['statement'] = '{} ({})'.format(call['message'], params)
160 160 for call in call_results[item['key']]:
161 161 report_id = call['tags']['report_id']['values']
162 162 group_id = reports_reversed.get(report_id)
163 163 if group_id:
164 164 row['latest_details'].append(
165 165 {'group_id': group_id, 'report_id': report_id})
166 166
167 167 final_results.append(row)
168 168
169 169 return final_results
@@ -1,430 +1,443 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors
4 4 #
5 5 # Licensed under the Apache License, Version 2.0 (the "License");
6 6 # you may not use this file except in compliance with the License.
7 7 # You may obtain a copy of the License at
8 8 #
9 9 # http://www.apache.org/licenses/LICENSE-2.0
10 10 #
11 11 # Unless required by applicable law or agreed to in writing, software
12 12 # distributed under the License is distributed on an "AS IS" BASIS,
13 13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 14 # See the License for the specific language governing permissions and
15 15 # limitations under the License.
16 16
17 17 import argparse
18 18 import datetime
19 19 import logging
20 20
21 21 import sqlalchemy as sa
22 import elasticsearch.exceptions
23 import elasticsearch.helpers
24
22 25 from collections import defaultdict
23 26 from pyramid.paster import setup_logging
24 27 from pyramid.paster import bootstrap
25 28 from appenlight.models import (
26 29 DBSession,
27 30 Datastores,
28 31 metadata
29 32 )
30 33 from appenlight.lib import get_callable
31 34 from appenlight.models.report_group import ReportGroup
32 35 from appenlight.models.report import Report
33 36 from appenlight.models.report_stat import ReportStat
34 37 from appenlight.models.log import Log
35 38 from appenlight.models.slow_call import SlowCall
36 39 from appenlight.models.metric import Metric
37 40
38 41
39 42 log = logging.getLogger(__name__)
40 43
41 44 tables = {
42 45 'slow_calls_p_': [],
43 46 'reports_stats_p_': [],
44 47 'reports_p_': [],
45 48 'reports_groups_p_': [],
46 49 'logs_p_': [],
47 50 'metrics_p_': [],
48 51 }
49 52
50 53 def detect_tables(table_prefix):
51 54 found_tables = []
52 55 db_tables_query = '''
53 56 SELECT tablename FROM pg_tables WHERE tablename NOT LIKE 'pg_%' AND
54 57 tablename NOT LIKE 'sql_%' ORDER BY tablename ASC;'''
55 58
56 59 for table in DBSession.execute(db_tables_query).fetchall():
57 60 tablename = table.tablename
58 61 if tablename.startswith(table_prefix):
59 62 t = sa.Table(tablename, metadata, autoload=True,
60 63 autoload_with=DBSession.bind.engine)
61 64 found_tables.append(t)
62 65 return found_tables
63 66
64 67
65 68 def main():
66 69 """
67 70 Recreates Elasticsearch indexes
68 71 Performs reindex of whole db to Elasticsearch
69 72
70 73 """
71 74
72 75 # need parser twice because we first need to load ini file
73 76 # bootstrap pyramid and then load plugins
74 77 pre_parser = argparse.ArgumentParser(
75 78 description='Reindex AppEnlight data',
76 79 add_help=False)
77 80 pre_parser.add_argument('-c', '--config', required=True,
78 81 help='Configuration ini file of application')
79 82 pre_parser.add_argument('-h', '--help', help='Show help', nargs='?')
80 83 pre_parser.add_argument('-t', '--types', nargs='+',
81 84 help='Which parts of database should get reindexed')
82 85 args = pre_parser.parse_args()
83 86
84 87 config_uri = args.config
85 88 setup_logging(config_uri)
86 89 log.setLevel(logging.INFO)
87 90 env = bootstrap(config_uri)
88 91 parser = argparse.ArgumentParser(description='Reindex AppEnlight data')
89 92 choices = {
90 93 'reports': 'appenlight.scripts.reindex_elasticsearch:reindex_reports',
91 94 'logs': 'appenlight.scripts.reindex_elasticsearch:reindex_logs',
92 95 'metrics': 'appenlight.scripts.reindex_elasticsearch:reindex_metrics',
93 96 'slow_calls': 'appenlight.scripts.reindex_elasticsearch:reindex_slow_calls',
94 97 'template': 'appenlight.scripts.reindex_elasticsearch:update_template'
95 98 }
96 99 for k, v in env['registry'].appenlight_plugins.items():
97 100 if v.get('fulltext_indexer'):
98 101 choices[k] = v['fulltext_indexer']
99 102 parser.add_argument('-t', '--types', nargs='*',
100 choices=['all'] + list(choices.keys()), default=['all'],
103 choices=['all'] + list(choices.keys()), default=[],
101 104 help='Which parts of database should get reindexed')
102 105 parser.add_argument('-c', '--config', required=True,
103 106 help='Configuration ini file of application')
104 107 args = parser.parse_args()
105 108
106 109
107 110 if 'all' in args.types:
108 111 args.types = list(choices.keys())
109 112
113 print("Selected types to reindex: {}".format(args.types))
114
110 115 log.info('settings {}'.format(args.types))
111 116
112 117 if 'template' in args.types:
113 118 get_callable(choices['template'])()
114 119 args.types.remove('template')
115 120 for selected in args.types:
116 121 get_callable(choices[selected])()
117 122
118 123
119 124 def update_template():
120 125 try:
121 Datastores.es.send_request("delete", ['_template', 'rcae'],
122 query_params={})
123 except Exception as e:
124 print(e)
126 Datastores.es.indices.delete_template('rcae')
127 except elasticsearch.exceptions.NotFoundError as e:
128 log.error(e)
125 129 log.info('updating elasticsearch template')
126 130 tag_templates = [
127 131 {"values": {
128 132 "path_match": "tags.*",
129 133 "mapping": {
130 134 "type": "object",
131 135 "properties": {
132 136 "values": {"type": "string", "analyzer": "tag_value"},
133 137 "numeric_values": {"type": "float"}
134 138 }
135 139 }
136 140 }}
137 141 ]
138 142
139 143 template_schema = {
140 144 "template": "rcae_*",
141 145 "settings": {
142 146 "index": {
143 147 "refresh_interval": "5s",
144 148 "translog": {"sync_interval": "5s",
145 149 "durability": "async"}
146 150 },
147 151 "number_of_shards": 5,
148 152 "analysis": {
149 153 "analyzer": {
150 154 "url_path": {
151 155 "type": "custom",
152 156 "char_filter": [],
153 157 "tokenizer": "path_hierarchy",
154 158 "filter": []
155 159 },
156 160 "tag_value": {
157 161 "type": "custom",
158 162 "char_filter": [],
159 163 "tokenizer": "keyword",
160 164 "filter": ["lowercase"]
161 165 },
162 166 }
163 167 },
164 168 },
165 169 "mappings": {
166 170 "report_group": {
167 171 "_all": {"enabled": False},
168 172 "dynamic_templates": tag_templates,
169 173 "properties": {
170 174 "pg_id": {"type": "string", "index": "not_analyzed"},
171 175 "resource_id": {"type": "integer"},
172 176 "priority": {"type": "integer"},
173 177 "error": {"type": "string", "analyzer": "simple"},
174 178 "read": {"type": "boolean"},
175 179 "occurences": {"type": "integer"},
176 180 "fixed": {"type": "boolean"},
177 181 "first_timestamp": {"type": "date"},
178 182 "last_timestamp": {"type": "date"},
179 183 "average_duration": {"type": "float"},
180 184 "summed_duration": {"type": "float"},
181 185 "public": {"type": "boolean"}
182 186 }
183 187 },
184 188 "report": {
185 189 "_all": {"enabled": False},
186 190 "dynamic_templates": tag_templates,
187 191 "properties": {
188 192 "pg_id": {"type": "string", "index": "not_analyzed"},
189 193 "resource_id": {"type": "integer"},
190 194 "group_id": {"type": "string"},
191 195 "http_status": {"type": "integer"},
192 196 "ip": {"type": "string", "index": "not_analyzed"},
193 197 "url_domain": {"type": "string", "analyzer": "simple"},
194 198 "url_path": {"type": "string", "analyzer": "url_path"},
195 199 "error": {"type": "string", "analyzer": "simple"},
196 200 "report_type": {"type": "integer"},
197 201 "start_time": {"type": "date"},
198 202 "request_id": {"type": "string", "index": "not_analyzed"},
199 203 "end_time": {"type": "date"},
200 204 "duration": {"type": "float"},
201 205 "tags": {
202 206 "type": "object"
203 207 },
204 208 "tag_list": {"type": "string", "analyzer": "tag_value"},
205 209 "extra": {
206 210 "type": "object"
207 211 },
208 212 },
209 213 "_parent": {"type": "report_group"}
210 214 },
211 215 "log": {
212 216 "_all": {"enabled": False},
213 217 "dynamic_templates": tag_templates,
214 218 "properties": {
215 219 "pg_id": {"type": "string", "index": "not_analyzed"},
216 220 "delete_hash": {"type": "string", "index": "not_analyzed"},
217 221 "resource_id": {"type": "integer"},
218 222 "timestamp": {"type": "date"},
219 223 "permanent": {"type": "boolean"},
220 224 "request_id": {"type": "string", "index": "not_analyzed"},
221 225 "log_level": {"type": "string", "analyzer": "simple"},
222 226 "message": {"type": "string", "analyzer": "simple"},
223 227 "namespace": {"type": "string", "index": "not_analyzed"},
224 228 "tags": {
225 229 "type": "object"
226 230 },
227 231 "tag_list": {"type": "string", "analyzer": "tag_value"}
228 232 }
229 233 }
230 234 }
231 235 }
232 236
233 Datastores.es.send_request('PUT', ['_template', 'rcae'],
234 body=template_schema, query_params={})
237 Datastores.es.indices.put_template('rcae', body=template_schema)
235 238
236 239
237 240 def reindex_reports():
238 241 reports_groups_tables = detect_tables('reports_groups_p_')
239 242 try:
240 Datastores.es.delete_index('rcae_r*')
241 except Exception as e:
243 Datastores.es.indices.delete('rcae_r*')
244 except elasticsearch.exceptions.NotFoundError as e:
242 245 log.error(e)
243 246
244 247 log.info('reindexing report groups')
245 248 i = 0
246 249 task_start = datetime.datetime.now()
247 250 for partition_table in reports_groups_tables:
248 251 conn = DBSession.connection().execution_options(stream_results=True)
249 252 result = conn.execute(partition_table.select())
250 253 while True:
251 254 chunk = result.fetchmany(2000)
252 255 if not chunk:
253 256 break
254 257 es_docs = defaultdict(list)
255 258 for row in chunk:
256 259 i += 1
257 260 item = ReportGroup(**dict(list(row.items())))
258 261 d_range = item.partition_id
259 262 es_docs[d_range].append(item.es_doc())
260 263 if es_docs:
261 264 name = partition_table.name
262 265 log.info('round {}, {}'.format(i, name))
263 266 for k, v in es_docs.items():
264 Datastores.es.bulk_index(k, 'report_group', v,
265 id_field="_id")
267 to_update = {'_index': k, '_type': 'report_group'}
268 [i.update(to_update) for i in v]
269 elasticsearch.helpers.bulk(Datastores.es, v)
266 270
267 271 log.info(
268 272 'total docs {} {}'.format(i, datetime.datetime.now() - task_start))
269 273
270 274 i = 0
271 275 log.info('reindexing reports')
272 276 task_start = datetime.datetime.now()
273 277 reports_tables = detect_tables('reports_p_')
274 278 for partition_table in reports_tables:
275 279 conn = DBSession.connection().execution_options(stream_results=True)
276 280 result = conn.execute(partition_table.select())
277 281 while True:
278 282 chunk = result.fetchmany(2000)
279 283 if not chunk:
280 284 break
281 285 es_docs = defaultdict(list)
282 286 for row in chunk:
283 287 i += 1
284 288 item = Report(**dict(list(row.items())))
285 289 d_range = item.partition_id
286 290 es_docs[d_range].append(item.es_doc())
287 291 if es_docs:
288 292 name = partition_table.name
289 293 log.info('round {}, {}'.format(i, name))
290 294 for k, v in es_docs.items():
291 Datastores.es.bulk_index(k, 'report', v, id_field="_id",
292 parent_field='_parent')
295 to_update = {'_index': k, '_type': 'report'}
296 [i.update(to_update) for i in v]
297 elasticsearch.helpers.bulk(Datastores.es, v)
293 298
294 299 log.info(
295 300 'total docs {} {}'.format(i, datetime.datetime.now() - task_start))
296 301
297 302 log.info('reindexing reports stats')
298 303 i = 0
299 304 task_start = datetime.datetime.now()
300 305 reports_stats_tables = detect_tables('reports_stats_p_')
301 306 for partition_table in reports_stats_tables:
302 307 conn = DBSession.connection().execution_options(stream_results=True)
303 308 result = conn.execute(partition_table.select())
304 309 while True:
305 310 chunk = result.fetchmany(2000)
306 311 if not chunk:
307 312 break
308 313 es_docs = defaultdict(list)
309 314 for row in chunk:
310 315 rd = dict(list(row.items()))
311 316 # remove legacy columns
312 317 # TODO: remove the column later
313 318 rd.pop('size', None)
314 319 item = ReportStat(**rd)
315 320 i += 1
316 321 d_range = item.partition_id
317 322 es_docs[d_range].append(item.es_doc())
318 323 if es_docs:
319 324 name = partition_table.name
320 325 log.info('round {}, {}'.format(i, name))
321 326 for k, v in es_docs.items():
322 Datastores.es.bulk_index(k, 'log', v)
327 to_update = {'_index': k, '_type': 'log'}
328 [i.update(to_update) for i in v]
329 elasticsearch.helpers.bulk(Datastores.es, v)
323 330
324 331 log.info(
325 332 'total docs {} {}'.format(i, datetime.datetime.now() - task_start))
326 333
327 334
328 335 def reindex_logs():
329 336 try:
330 Datastores.es.delete_index('rcae_l*')
331 except Exception as e:
337 Datastores.es.indices.delete('rcae_l*')
338 except elasticsearch.exceptions.NotFoundError as e:
332 339 log.error(e)
333 340
334 341 # logs
335 342 log.info('reindexing logs')
336 343 i = 0
337 344 task_start = datetime.datetime.now()
338 345 log_tables = detect_tables('logs_p_')
339 346 for partition_table in log_tables:
340 347 conn = DBSession.connection().execution_options(stream_results=True)
341 348 result = conn.execute(partition_table.select())
342 349 while True:
343 350 chunk = result.fetchmany(2000)
344 351 if not chunk:
345 352 break
346 353 es_docs = defaultdict(list)
347 354
348 355 for row in chunk:
349 356 i += 1
350 357 item = Log(**dict(list(row.items())))
351 358 d_range = item.partition_id
352 359 es_docs[d_range].append(item.es_doc())
353 360 if es_docs:
354 361 name = partition_table.name
355 362 log.info('round {}, {}'.format(i, name))
356 363 for k, v in es_docs.items():
357 Datastores.es.bulk_index(k, 'log', v)
364 to_update = {'_index': k, '_type': 'log'}
365 [i.update(to_update) for i in v]
366 elasticsearch.helpers.bulk(Datastores.es, v)
358 367
359 368 log.info(
360 369 'total docs {} {}'.format(i, datetime.datetime.now() - task_start))
361 370
362 371
363 372 def reindex_metrics():
364 373 try:
365 Datastores.es.delete_index('rcae_m*')
366 except Exception as e:
367 print(e)
374 Datastores.es.indices.delete('rcae_m*')
375 except elasticsearch.exceptions.NotFoundError as e:
376 log.error(e)
368 377
369 378 log.info('reindexing applications metrics')
370 379 i = 0
371 380 task_start = datetime.datetime.now()
372 381 metric_tables = detect_tables('metrics_p_')
373 382 for partition_table in metric_tables:
374 383 conn = DBSession.connection().execution_options(stream_results=True)
375 384 result = conn.execute(partition_table.select())
376 385 while True:
377 386 chunk = result.fetchmany(2000)
378 387 if not chunk:
379 388 break
380 389 es_docs = defaultdict(list)
381 390 for row in chunk:
382 391 i += 1
383 392 item = Metric(**dict(list(row.items())))
384 393 d_range = item.partition_id
385 394 es_docs[d_range].append(item.es_doc())
386 395 if es_docs:
387 396 name = partition_table.name
388 397 log.info('round {}, {}'.format(i, name))
389 398 for k, v in es_docs.items():
390 Datastores.es.bulk_index(k, 'log', v)
399 to_update = {'_index': k, '_type': 'log'}
400 [i.update(to_update) for i in v]
401 elasticsearch.helpers.bulk(Datastores.es, v)
391 402
392 403 log.info(
393 404 'total docs {} {}'.format(i, datetime.datetime.now() - task_start))
394 405
395 406
396 407 def reindex_slow_calls():
397 408 try:
398 Datastores.es.delete_index('rcae_sc*')
399 except Exception as e:
400 print(e)
409 Datastores.es.indices.delete('rcae_sc*')
410 except elasticsearch.exceptions.NotFoundError as e:
411 log.error(e)
401 412
402 413 log.info('reindexing slow calls')
403 414 i = 0
404 415 task_start = datetime.datetime.now()
405 416 slow_calls_tables = detect_tables('slow_calls_p_')
406 417 for partition_table in slow_calls_tables:
407 418 conn = DBSession.connection().execution_options(stream_results=True)
408 419 result = conn.execute(partition_table.select())
409 420 while True:
410 421 chunk = result.fetchmany(2000)
411 422 if not chunk:
412 423 break
413 424 es_docs = defaultdict(list)
414 425 for row in chunk:
415 426 i += 1
416 427 item = SlowCall(**dict(list(row.items())))
417 428 d_range = item.partition_id
418 429 es_docs[d_range].append(item.es_doc())
419 430 if es_docs:
420 431 name = partition_table.name
421 432 log.info('round {}, {}'.format(i, name))
422 433 for k, v in es_docs.items():
423 Datastores.es.bulk_index(k, 'log', v)
434 to_update = {'_index': k, '_type': 'log'}
435 [i.update(to_update) for i in v]
436 elasticsearch.helpers.bulk(Datastores.es, v)
424 437
425 438 log.info(
426 439 'total docs {} {}'.format(i, datetime.datetime.now() - task_start))
427 440
428 441
429 442 if __name__ == '__main__':
430 443 main()
@@ -1,194 +1,193 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors
4 4 #
5 5 # Licensed under the Apache License, Version 2.0 (the "License");
6 6 # you may not use this file except in compliance with the License.
7 7 # You may obtain a copy of the License at
8 8 #
9 9 # http://www.apache.org/licenses/LICENSE-2.0
10 10 #
11 11 # Unless required by applicable law or agreed to in writing, software
12 12 # distributed under the License is distributed on an "AS IS" BASIS,
13 13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 14 # See the License for the specific language governing permissions and
15 15 # limitations under the License.
16 16
17 17 import logging
18 18 import os
19 19 import pkg_resources
20 20
21 21 from datetime import datetime, timedelta
22 22
23 23 import psutil
24 24 import redis
25 25
26 26 from pyramid.view import view_config
27 27 from appenlight.models import DBSession
28 28 from appenlight.models import Datastores
29 29 from appenlight.lib.redis_keys import REDIS_KEYS
30 30
31 31
32 32 def bytes2human(total):
33 33 giga = 1024.0 ** 3
34 34 mega = 1024.0 ** 2
35 35 kilo = 1024.0
36 36 if giga <= total:
37 37 return '{:0.1f}G'.format(total / giga)
38 38 elif mega <= total:
39 39 return '{:0.1f}M'.format(total / mega)
40 40 else:
41 41 return '{:0.1f}K'.format(total / kilo)
42 42
43 43
44 44 log = logging.getLogger(__name__)
45 45
46 46
47 47 @view_config(route_name='section_view',
48 48 match_param=['section=admin_section', 'view=system'],
49 49 renderer='json', permission='root_administration')
50 50 def system(request):
51 51 current_time = datetime.utcnow(). \
52 52 replace(second=0, microsecond=0) - timedelta(minutes=1)
53 53 # global app counter
54 54 processed_reports = request.registry.redis_conn.get(
55 55 REDIS_KEYS['counters']['reports_per_minute'].format(current_time))
56 56 processed_reports = int(processed_reports) if processed_reports else 0
57 57 processed_logs = request.registry.redis_conn.get(
58 58 REDIS_KEYS['counters']['logs_per_minute'].format(current_time))
59 59 processed_logs = int(processed_logs) if processed_logs else 0
60 60 processed_metrics = request.registry.redis_conn.get(
61 61 REDIS_KEYS['counters']['metrics_per_minute'].format(current_time))
62 62 processed_metrics = int(processed_metrics) if processed_metrics else 0
63 63
64 64 waiting_reports = 0
65 65 waiting_logs = 0
66 66 waiting_metrics = 0
67 67 waiting_other = 0
68 68
69 69 if 'redis' in request.registry.settings['celery.broker_type']:
70 70 redis_client = redis.StrictRedis.from_url(
71 71 request.registry.settings['celery.broker_url'])
72 72 waiting_reports = redis_client.llen('reports')
73 73 waiting_logs = redis_client.llen('logs')
74 74 waiting_metrics = redis_client.llen('metrics')
75 75 waiting_other = redis_client.llen('default')
76 76
77 77 # process
78 78 def replace_inf(val):
79 79 return val if val != psutil.RLIM_INFINITY else 'unlimited'
80 80
81 81 p = psutil.Process()
82 82 fd = p.rlimit(psutil.RLIMIT_NOFILE)
83 83 memlock = p.rlimit(psutil.RLIMIT_MEMLOCK)
84 84 self_info = {
85 85 'fds': {'soft': replace_inf(fd[0]),
86 86 'hard': replace_inf(fd[1])},
87 87 'memlock': {'soft': replace_inf(memlock[0]),
88 88 'hard': replace_inf(memlock[1])},
89 89 }
90 90
91 91 # disks
92 92 disks = []
93 93 for part in psutil.disk_partitions(all=False):
94 94 if os.name == 'nt':
95 95 if 'cdrom' in part.opts or part.fstype == '':
96 96 continue
97 97 usage = psutil.disk_usage(part.mountpoint)
98 98 disks.append({
99 99 'device': part.device,
100 100 'total': bytes2human(usage.total),
101 101 'used': bytes2human(usage.used),
102 102 'free': bytes2human(usage.free),
103 103 'percentage': int(usage.percent),
104 104 'mountpoint': part.mountpoint,
105 105 'fstype': part.fstype
106 106 })
107 107
108 108 # memory
109 109 memory_v = psutil.virtual_memory()
110 110 memory_s = psutil.swap_memory()
111 111
112 112 memory = {
113 113 'total': bytes2human(memory_v.total),
114 114 'available': bytes2human(memory_v.available),
115 115 'percentage': memory_v.percent,
116 116 'used': bytes2human(memory_v.used),
117 117 'free': bytes2human(memory_v.free),
118 118 'active': bytes2human(memory_v.active),
119 119 'inactive': bytes2human(memory_v.inactive),
120 120 'buffers': bytes2human(memory_v.buffers),
121 121 'cached': bytes2human(memory_v.cached),
122 122 'swap_total': bytes2human(memory_s.total),
123 123 'swap_used': bytes2human(memory_s.used)
124 124 }
125 125
126 126 # load
127 127 system_load = os.getloadavg()
128 128
129 129 # processes
130 130 min_mem = 1024 * 1024 * 40 # 40MB
131 131 process_info = []
132 132 for p in psutil.process_iter():
133 133 mem_used = p.get_memory_info().rss
134 134 if mem_used < min_mem:
135 135 continue
136 136 process_info.append({'owner': p.username(),
137 137 'pid': p.pid,
138 138 'cpu': round(p.get_cpu_percent(interval=0), 1),
139 139 'mem_percentage': round(p.get_memory_percent(),
140 140 1),
141 141 'mem_usage': bytes2human(mem_used),
142 142 'name': p.name(),
143 143 'command': ' '.join(p.cmdline())
144 144 })
145 145 process_info = sorted(process_info, key=lambda x: x['mem_percentage'],
146 146 reverse=True)
147 147
148 148 # pg tables
149 149
150 150 db_size_query = '''
151 151 SELECT tablename, pg_total_relation_size(tablename::text) size
152 152 FROM pg_tables WHERE tablename NOT LIKE 'pg_%' AND
153 153 tablename NOT LIKE 'sql_%' ORDER BY size DESC;'''
154 154
155 155 db_tables = []
156 156 for row in DBSession.execute(db_size_query):
157 157 db_tables.append({"size_human": bytes2human(row.size),
158 158 "table_name": row.tablename})
159 159
160 160 # es indices
161 161 es_indices = []
162 result = Datastores.es.send_request(
163 'GET', ['_stats', 'store, docs'], query_params={})
162 result = Datastores.es.indices.stats(metric=['store, docs'])
164 163 for ix, stats in result['indices'].items():
165 164 size = stats['primaries']['store']['size_in_bytes']
166 165 es_indices.append({'name': ix,
167 166 'size': size,
168 167 'size_human': bytes2human(size)})
169 168
170 169 # packages
171 170
172 171 packages = ({'name': p.project_name, 'version': p.version}
173 172 for p in pkg_resources.working_set)
174 173
175 174 return {'db_tables': db_tables,
176 175 'es_indices': sorted(es_indices,
177 176 key=lambda x: x['size'], reverse=True),
178 177 'process_info': process_info,
179 178 'system_load': system_load,
180 179 'disks': disks,
181 180 'memory': memory,
182 181 'packages': sorted(packages, key=lambda x: x['name'].lower()),
183 182 'current_time': current_time,
184 183 'queue_stats': {
185 184 'processed_reports': processed_reports,
186 185 'processed_logs': processed_logs,
187 186 'processed_metrics': processed_metrics,
188 187 'waiting_reports': waiting_reports,
189 188 'waiting_logs': waiting_logs,
190 189 'waiting_metrics': waiting_metrics,
191 190 'waiting_other': waiting_other
192 191 },
193 192 'self_info': self_info
194 193 }
@@ -1,148 +1,148 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors
4 4 #
5 5 # Licensed under the Apache License, Version 2.0 (the "License");
6 6 # you may not use this file except in compliance with the License.
7 7 # You may obtain a copy of the License at
8 8 #
9 9 # http://www.apache.org/licenses/LICENSE-2.0
10 10 #
11 11 # Unless required by applicable law or agreed to in writing, software
12 12 # distributed under the License is distributed on an "AS IS" BASIS,
13 13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 14 # See the License for the specific language governing permissions and
15 15 # limitations under the License.
16 16
17 17 from pyramid.view import view_config
18 18 from appenlight.models import DBSession, Datastores
19 19 from appenlight.forms import get_partition_deletion_form
20 20
21 21 import logging
22 22
23 23 from zope.sqlalchemy import mark_changed
24 24 from datetime import datetime
25 25 import sqlalchemy as sa
26 26
27 27 log = logging.getLogger(__name__)
28 28
29 29
30 30 def get_partition_stats():
31 31 table_query = """
32 32 SELECT table_name
33 33 FROM information_schema.tables
34 34 GROUP BY table_name
35 35 ORDER BY table_name
36 36 """
37 37
38 38 permanent_partitions = {}
39 39 daily_partitions = {}
40 40
41 41 def is_int(data):
42 42 try:
43 43 int(data)
44 44 return True
45 45 except Exception:
46 46 pass
47 47 return False
48 48
49 49 def add_key(key, holder):
50 50 if not ix_time in holder:
51 51 holder[ix_time] = {'pg': [], 'elasticsearch': []}
52 52
53 for partition in list(Datastores.es.aliases().keys()):
53 for partition in list(Datastores.es.indices.get_alias('rcae*')):
54 54 if not partition.startswith('rcae'):
55 55 continue
56 56 split_data = partition.split('_')
57 57 permanent = False
58 58 # if we dont have a day then treat it as permanent partion
59 59 if False in list(map(is_int, split_data[-3:])):
60 60 ix_time = datetime(year=int(split_data[-2]),
61 61 month=int(split_data[-1]),
62 62 day=1).date()
63 63 permanent = True
64 64 else:
65 65 ix_time = datetime(year=int(split_data[-3]),
66 66 month=int(split_data[-2]),
67 67 day=int(split_data[-1])).date()
68 68
69 69 ix_time = str(ix_time)
70 70 if permanent:
71 71 add_key(ix_time, permanent_partitions)
72 72 if ix_time not in permanent_partitions:
73 73 permanent_partitions[ix_time]['elasticsearch'] = []
74 74 permanent_partitions[ix_time]['elasticsearch'].append(partition)
75 75 else:
76 76 add_key(ix_time, daily_partitions)
77 77 if ix_time not in daily_partitions:
78 78 daily_partitions[ix_time]['elasticsearch'] = []
79 79 daily_partitions[ix_time]['elasticsearch'].append(partition)
80 80
81 81 for row in DBSession.execute(table_query):
82 82 splitted = row['table_name'].split('_')
83 83 if 'p' in splitted:
84 84 # dealing with partition
85 85 split_data = [int(x) for x in splitted[splitted.index('p') + 1:]]
86 86 if len(split_data) == 3:
87 87 ix_time = datetime(split_data[0], split_data[1],
88 88 split_data[2]).date()
89 89 ix_time = str(ix_time)
90 90 add_key(ix_time, daily_partitions)
91 91 daily_partitions[ix_time]['pg'].append(row['table_name'])
92 92 else:
93 93 ix_time = datetime(split_data[0], split_data[1], 1).date()
94 94 ix_time = str(ix_time)
95 95 add_key(ix_time, permanent_partitions)
96 96 permanent_partitions[ix_time]['pg'].append(row['table_name'])
97 97
98 98 return permanent_partitions, daily_partitions
99 99
100 100
101 101 @view_config(route_name='section_view', permission='root_administration',
102 102 match_param=['section=admin_section', 'view=partitions'],
103 103 renderer='json', request_method='GET')
104 104 def index(request):
105 105 permanent_partitions, daily_partitions = get_partition_stats()
106 106
107 107 return {"permanent_partitions": sorted(list(permanent_partitions.items()),
108 108 key=lambda x: x[0], reverse=True),
109 109 "daily_partitions": sorted(list(daily_partitions.items()),
110 110 key=lambda x: x[0], reverse=True)}
111 111
112 112
113 113 @view_config(route_name='section_view', request_method='POST',
114 114 match_param=['section=admin_section', 'view=partitions_remove'],
115 115 renderer='json', permission='root_administration')
116 116 def partitions_remove(request):
117 117 permanent_partitions, daily_partitions = get_partition_stats()
118 118 pg_partitions = []
119 119 es_partitions = []
120 120 for item in list(permanent_partitions.values()) + list(daily_partitions.values()):
121 121 es_partitions.extend(item['elasticsearch'])
122 122 pg_partitions.extend(item['pg'])
123 123 FormCls = get_partition_deletion_form(es_partitions, pg_partitions)
124 124 form = FormCls(es_index=request.unsafe_json_body['es_indices'],
125 125 pg_index=request.unsafe_json_body['pg_indices'],
126 126 confirm=request.unsafe_json_body['confirm'],
127 127 csrf_context=request)
128 128 if form.validate():
129 129 for ix in form.data['es_index']:
130 130 log.warning('deleting ES partition: {}'.format(ix))
131 Datastores.es.delete_index(ix)
131 Datastores.es.indices.delete(ix)
132 132 for ix in form.data['pg_index']:
133 133 log.warning('deleting PG partition: {}'.format(ix))
134 134 stmt = sa.text('DROP TABLE %s CASCADE' % sa.text(ix))
135 135 session = DBSession()
136 136 session.connection().execute(stmt)
137 137 mark_changed(session)
138 138
139 139 for field, error in form.errors.items():
140 140 msg = '%s: %s' % (field, error[0])
141 141 request.session.flash(msg, 'error')
142 142
143 143 permanent_partitions, daily_partitions = get_partition_stats()
144 144 return {
145 145 "permanent_partitions": sorted(
146 146 list(permanent_partitions.items()), key=lambda x: x[0], reverse=True),
147 147 "daily_partitions": sorted(
148 148 list(daily_partitions.items()), key=lambda x: x[0], reverse=True)}
@@ -1,224 +1,224 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors
4 4 #
5 5 # Licensed under the Apache License, Version 2.0 (the "License");
6 6 # you may not use this file except in compliance with the License.
7 7 # You may obtain a copy of the License at
8 8 #
9 9 # http://www.apache.org/licenses/LICENSE-2.0
10 10 #
11 11 # Unless required by applicable law or agreed to in writing, software
12 12 # distributed under the License is distributed on an "AS IS" BASIS,
13 13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 14 # See the License for the specific language governing permissions and
15 15 # limitations under the License.
16 16
17 17 import logging
18 18 from datetime import datetime, timedelta
19 19
20 20 from pyramid.view import view_config
21 21 from pyramid.httpexceptions import HTTPUnprocessableEntity
22 22 from appenlight.models import Datastores, Log
23 23 from appenlight.models.services.log import LogService
24 24 from appenlight.lib.utils import (build_filter_settings_from_query_dict,
25 25 es_index_name_limiter)
26 26 from appenlight.lib.helpers import gen_pagination_headers
27 27 from appenlight.celery.tasks import logs_cleanup
28 28
29 29 log = logging.getLogger(__name__)
30 30
31 31 section_filters_key = 'appenlight:logs:filter:%s'
32 32
33 33
34 34 @view_config(route_name='logs_no_id', renderer='json',
35 35 permission='authenticated')
36 36 def fetch_logs(request):
37 37 """
38 38 Returns list of log entries from Elasticsearch
39 39 """
40 40
41 41 filter_settings = build_filter_settings_from_query_dict(request,
42 42 request.GET.mixed())
43 43 logs_paginator = LogService.get_paginator_by_app_ids(
44 44 app_ids=filter_settings['resource'],
45 45 page=filter_settings['page'],
46 46 filter_settings=filter_settings
47 47 )
48 48 headers = gen_pagination_headers(request, logs_paginator)
49 49 request.response.headers.update(headers)
50 50
51 51 return [l.get_dict() for l in logs_paginator.sa_items]
52 52
53 53
54 54 @view_config(route_name='section_view',
55 55 match_param=['section=logs_section', 'view=fetch_series'],
56 56 renderer='json', permission='authenticated')
57 57 def logs_fetch_series(request):
58 58 """
59 59 Handles metric dashboard graphs
60 60 Returns information for time/tier breakdown
61 61 """
62 62 filter_settings = build_filter_settings_from_query_dict(request,
63 63 request.GET.mixed())
64 64 paginator = LogService.get_paginator_by_app_ids(
65 65 app_ids=filter_settings['resource'],
66 66 page=1, filter_settings=filter_settings, items_per_page=1)
67 67 now = datetime.utcnow().replace(microsecond=0, second=0)
68 68 delta = timedelta(days=7)
69 69 if paginator.sa_items:
70 70 start_date = paginator.sa_items[-1].timestamp.replace(microsecond=0,
71 71 second=0)
72 72 filter_settings['start_date'] = start_date - delta
73 73 else:
74 74 filter_settings['start_date'] = now - delta
75 75 filter_settings['end_date'] = filter_settings['start_date'] \
76 76 + timedelta(days=7)
77 77
78 78 @request.registry.cache_regions.redis_sec_30.cache_on_arguments(
79 79 'logs_graphs')
80 80 def cached(apps, search_params, delta, now):
81 81 data = LogService.get_time_series_aggregate(
82 82 filter_settings['resource'], filter_settings)
83 83 if not data:
84 84 return []
85 85 buckets = data['aggregations']['events_over_time']['buckets']
86 86 return [{"x": datetime.utcfromtimestamp(item["key"] / 1000),
87 87 "logs": item["doc_count"]} for item in buckets]
88 88
89 89 return cached(filter_settings, request.GET.mixed(), delta, now)
90 90
91 91
92 92 @view_config(route_name='logs_no_id', renderer='json', request_method="DELETE",
93 93 permission='authenticated')
94 94 def logs_mass_delete(request):
95 95 params = request.GET.mixed()
96 96 if 'resource' not in params:
97 97 raise HTTPUnprocessableEntity()
98 98 # this might be '' and then colander will not validate the schema
99 99 if not params.get('namespace'):
100 100 params.pop('namespace', None)
101 101 filter_settings = build_filter_settings_from_query_dict(
102 102 request, params, resource_permissions=['update_reports'])
103 103
104 104 resource_id = list(filter_settings['resource'])[0]
105 105 # filter settings returns list of all of users applications
106 106 # if app is not matching - normally we would not care as its used for search
107 107 # but here user playing with params would possibly wipe out their whole data
108 108 if int(resource_id) != int(params['resource']):
109 109 raise HTTPUnprocessableEntity()
110 110
111 111 logs_cleanup.delay(resource_id, filter_settings)
112 112 msg = 'Log cleanup process started - it may take a while for ' \
113 113 'everything to get removed'
114 114 request.session.flash(msg)
115 115 return {}
116 116
117 117
118 118 @view_config(route_name='section_view',
119 119 match_param=("view=common_tags", "section=logs_section"),
120 120 renderer='json', permission='authenticated')
121 121 def common_tags(request):
122 122 config = request.GET.mixed()
123 123 filter_settings = build_filter_settings_from_query_dict(request,
124 124 config)
125 125
126 126 resources = list(filter_settings["resource"])
127 127 query = {
128 128 "query": {
129 129 "filtered": {
130 130 "filter": {
131 131 "and": [{"terms": {"resource_id": list(resources)}}]
132 132 }
133 133 }
134 134 }
135 135 }
136 136 start_date = filter_settings.get('start_date')
137 137 end_date = filter_settings.get('end_date')
138 138 filter_part = query['query']['filtered']['filter']['and']
139 139
140 140 date_range = {"range": {"timestamp": {}}}
141 141 if start_date:
142 142 date_range["range"]["timestamp"]["gte"] = start_date
143 143 if end_date:
144 144 date_range["range"]["timestamp"]["lte"] = end_date
145 145 if start_date or end_date:
146 146 filter_part.append(date_range)
147 147
148 148 levels = filter_settings.get('level')
149 149 if levels:
150 150 filter_part.append({"terms": {'log_level': levels}})
151 151 namespaces = filter_settings.get('namespace')
152 152 if namespaces:
153 153 filter_part.append({"terms": {'namespace': namespaces}})
154 154
155 155 query["aggs"] = {
156 156 "sub_agg": {
157 157 "terms": {
158 158 "field": "tag_list",
159 159 "size": 50
160 160 }
161 161 }
162 162 }
163 163 # tags
164 164 index_names = es_index_name_limiter(
165 165 ixtypes=[config.get('datasource', 'logs')])
166 result = Datastores.es.search(query, index=index_names, doc_type='log',
166 result = Datastores.es.search(body=query, index=index_names, doc_type='log',
167 167 size=0)
168 168 tag_buckets = result['aggregations']['sub_agg'].get('buckets', [])
169 169 # namespaces
170 170 query["aggs"] = {
171 171 "sub_agg": {
172 172 "terms": {
173 173 "field": "namespace",
174 174 "size": 50
175 175 }
176 176 }
177 177 }
178 result = Datastores.es.search(query, index=index_names, doc_type='log',
178 result = Datastores.es.search(body=query, index=index_names, doc_type='log',
179 179 size=0)
180 180 namespaces_buckets = result['aggregations']['sub_agg'].get('buckets', [])
181 181 return {
182 182 "tags": [item['key'] for item in tag_buckets],
183 183 "namespaces": [item['key'] for item in namespaces_buckets]
184 184 }
185 185
186 186
187 187 @view_config(route_name='section_view',
188 188 match_param=("view=common_values", "section=logs_section"),
189 189 renderer='json', permission='authenticated')
190 190 def common_values(request):
191 191 config = request.GET.mixed()
192 192 datasource = config.pop('datasource', 'logs')
193 193 filter_settings = build_filter_settings_from_query_dict(request,
194 194 config)
195 195 resources = list(filter_settings["resource"])
196 196 tag_name = filter_settings['tags'][0]['value'][0]
197 197 query = {
198 198 'query': {
199 199 'filtered': {
200 200 'filter': {
201 201 'and': [
202 202 {'terms': {'resource_id': list(resources)}},
203 203 {'terms': {
204 204 'namespace': filter_settings['namespace']}}
205 205 ]
206 206 }
207 207 }
208 208 }
209 209 }
210 210 query['aggs'] = {
211 211 'sub_agg': {
212 212 'terms': {
213 213 'field': 'tags.{}.values'.format(tag_name),
214 214 'size': 50
215 215 }
216 216 }
217 217 }
218 218 index_names = es_index_name_limiter(ixtypes=[datasource])
219 result = Datastores.es.search(query, index=index_names, doc_type='log',
219 result = Datastores.es.search(body=query, index=index_names, doc_type='log',
220 220 size=0)
221 221 values_buckets = result['aggregations']['sub_agg'].get('buckets', [])
222 222 return {
223 223 "values": [item['key'] for item in values_buckets]
224 224 }
General Comments 0
You need to be logged in to leave comments. Login now