##// END OF EJS Templates
channelstream: separate element and basic pubsub
ergo -
Show More

The requested changes are too big and content was truncated. Show full diff

1 NO CONTENT: new file 100644
NO CONTENT: new file 100644
The requested commit or file is too big and content was truncated. Show full diff
@@ -1,495 +1,495 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2010-2016 RhodeCode GmbH
3 # Copyright (C) 2010-2016 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # AppEnlight Enterprise Edition, including its added features, Support
18 # AppEnlight Enterprise Edition, including its added features, Support
19 # services, and proprietary license terms, please see
19 # services, and proprietary license terms, please see
20 # https://rhodecode.com/licenses/
20 # https://rhodecode.com/licenses/
21
21
22 """
22 """
23 Utility functions.
23 Utility functions.
24 """
24 """
25 import logging
25 import logging
26 import requests
26 import requests
27 import hashlib
27 import hashlib
28 import json
28 import json
29 import copy
29 import copy
30 import uuid
30 import uuid
31 import appenlight.lib.helpers as h
31 import appenlight.lib.helpers as h
32 from collections import namedtuple
32 from collections import namedtuple
33 from datetime import timedelta, datetime, date
33 from datetime import timedelta, datetime, date
34 from dogpile.cache.api import NO_VALUE
34 from dogpile.cache.api import NO_VALUE
35 from appenlight.models import Datastores
35 from appenlight.models import Datastores
36 from appenlight.validators import (LogSearchSchema,
36 from appenlight.validators import (LogSearchSchema,
37 TagListSchema,
37 TagListSchema,
38 accepted_search_params)
38 accepted_search_params)
39 from itsdangerous import TimestampSigner
39 from itsdangerous import TimestampSigner
40 from ziggurat_foundations.permissions import ALL_PERMISSIONS
40 from ziggurat_foundations.permissions import ALL_PERMISSIONS
41 from dateutil.relativedelta import relativedelta
41 from dateutil.relativedelta import relativedelta
42 from dateutil.rrule import rrule, MONTHLY, DAILY
42 from dateutil.rrule import rrule, MONTHLY, DAILY
43
43
44 log = logging.getLogger(__name__)
44 log = logging.getLogger(__name__)
45
45
46
46
47 Stat = namedtuple('Stat', 'start_interval value')
47 Stat = namedtuple('Stat', 'start_interval value')
48
48
49
49
50 def default_extractor(item):
50 def default_extractor(item):
51 """
51 """
52 :param item - item to extract date from
52 :param item - item to extract date from
53 """
53 """
54 if hasattr(item, 'start_interval'):
54 if hasattr(item, 'start_interval'):
55 return item.start_interval
55 return item.start_interval
56 return item['start_interval']
56 return item['start_interval']
57
57
58
58
59 # fast gap generator
59 # fast gap generator
60 def gap_gen_default(start, step, itemiterator, end_time=None,
60 def gap_gen_default(start, step, itemiterator, end_time=None,
61 iv_extractor=None):
61 iv_extractor=None):
62 """ generates a list of time/value items based on step and itemiterator
62 """ generates a list of time/value items based on step and itemiterator
63 if there are entries missing from iterator time/None will be returned
63 if there are entries missing from iterator time/None will be returned
64 instead
64 instead
65 :param start - datetime - what time should we start generating our values
65 :param start - datetime - what time should we start generating our values
66 :param step - timedelta - stepsize
66 :param step - timedelta - stepsize
67 :param itemiterator - iterable - we will check this iterable for values
67 :param itemiterator - iterable - we will check this iterable for values
68 corresponding to generated steps
68 corresponding to generated steps
69 :param end_time - datetime - when last step is >= end_time stop iterating
69 :param end_time - datetime - when last step is >= end_time stop iterating
70 :param iv_extractor - extracts current step from iterable items
70 :param iv_extractor - extracts current step from iterable items
71 """
71 """
72
72
73 if not iv_extractor:
73 if not iv_extractor:
74 iv_extractor = default_extractor
74 iv_extractor = default_extractor
75
75
76 next_step = start
76 next_step = start
77 minutes = step.total_seconds() / 60.0
77 minutes = step.total_seconds() / 60.0
78 while next_step.minute % minutes != 0:
78 while next_step.minute % minutes != 0:
79 next_step = next_step.replace(minute=next_step.minute - 1)
79 next_step = next_step.replace(minute=next_step.minute - 1)
80 for item in itemiterator:
80 for item in itemiterator:
81 item_start_interval = iv_extractor(item)
81 item_start_interval = iv_extractor(item)
82 # do we have a match for current time step in our data?
82 # do we have a match for current time step in our data?
83 # no gen a new tuple with 0 values
83 # no gen a new tuple with 0 values
84 while next_step < item_start_interval:
84 while next_step < item_start_interval:
85 yield Stat(next_step, None)
85 yield Stat(next_step, None)
86 next_step = next_step + step
86 next_step = next_step + step
87 if next_step == item_start_interval:
87 if next_step == item_start_interval:
88 yield Stat(item_start_interval, item)
88 yield Stat(item_start_interval, item)
89 next_step = next_step + step
89 next_step = next_step + step
90 if end_time:
90 if end_time:
91 while next_step < end_time:
91 while next_step < end_time:
92 yield Stat(next_step, None)
92 yield Stat(next_step, None)
93 next_step = next_step + step
93 next_step = next_step + step
94
94
95
95
96 class DateTimeEncoder(json.JSONEncoder):
96 class DateTimeEncoder(json.JSONEncoder):
97 """ Simple datetime to ISO encoder for json serialization"""
97 """ Simple datetime to ISO encoder for json serialization"""
98
98
99 def default(self, obj):
99 def default(self, obj):
100 if isinstance(obj, date):
100 if isinstance(obj, date):
101 return obj.isoformat()
101 return obj.isoformat()
102 if isinstance(obj, datetime):
102 if isinstance(obj, datetime):
103 return obj.isoformat()
103 return obj.isoformat()
104 return json.JSONEncoder.default(self, obj)
104 return json.JSONEncoder.default(self, obj)
105
105
106
106
107 def cometd_request(secret, endpoint, payload, throw_exceptions=False,
107 def channelstream_request(secret, endpoint, payload, throw_exceptions=False,
108 servers=None):
108 servers=None):
109 responses = []
109 responses = []
110 if not servers:
110 if not servers:
111 servers = []
111 servers = []
112
112
113 signer = TimestampSigner(secret)
113 signer = TimestampSigner(secret)
114 sig_for_server = signer.sign(endpoint)
114 sig_for_server = signer.sign(endpoint)
115 for secret, server in [(s['secret'], s['server']) for s in servers]:
115 for secret, server in [(s['secret'], s['server']) for s in servers]:
116 response = {}
116 response = {}
117 secret_headers = {'x-channelstream-secret': sig_for_server,
117 secret_headers = {'x-channelstream-secret': sig_for_server,
118 'x-channelstream-endpoint': endpoint,
118 'x-channelstream-endpoint': endpoint,
119 'Content-Type': 'application/json'}
119 'Content-Type': 'application/json'}
120 url = '%s%s' % (server, endpoint)
120 url = '%s%s' % (server, endpoint)
121 try:
121 try:
122 response = requests.post(url,
122 response = requests.post(url,
123 data=json.dumps(payload,
123 data=json.dumps(payload,
124 cls=DateTimeEncoder),
124 cls=DateTimeEncoder),
125 headers=secret_headers,
125 headers=secret_headers,
126 verify=False,
126 verify=False,
127 timeout=2).json()
127 timeout=2).json()
128 except requests.exceptions.RequestException as e:
128 except requests.exceptions.RequestException as e:
129 if throw_exceptions:
129 if throw_exceptions:
130 raise
130 raise
131 responses.append(response)
131 responses.append(response)
132 return responses
132 return responses
133
133
134
134
135 def add_cors_headers(response):
135 def add_cors_headers(response):
136 # allow CORS
136 # allow CORS
137 response.headers.add('Access-Control-Allow-Origin', '*')
137 response.headers.add('Access-Control-Allow-Origin', '*')
138 response.headers.add('XDomainRequestAllowed', '1')
138 response.headers.add('XDomainRequestAllowed', '1')
139 response.headers.add('Access-Control-Allow-Methods', 'GET, POST, OPTIONS')
139 response.headers.add('Access-Control-Allow-Methods', 'GET, POST, OPTIONS')
140 # response.headers.add('Access-Control-Allow-Credentials', 'true')
140 # response.headers.add('Access-Control-Allow-Credentials', 'true')
141 response.headers.add('Access-Control-Allow-Headers',
141 response.headers.add('Access-Control-Allow-Headers',
142 'Content-Type, Depth, User-Agent, X-File-Size, X-Requested-With, If-Modified-Since, X-File-Name, Cache-Control, Pragma, Origin, Connection, Referer, Cookie')
142 'Content-Type, Depth, User-Agent, X-File-Size, X-Requested-With, If-Modified-Since, X-File-Name, Cache-Control, Pragma, Origin, Connection, Referer, Cookie')
143 response.headers.add('Access-Control-Max-Age', '86400')
143 response.headers.add('Access-Control-Max-Age', '86400')
144
144
145
145
146 from sqlalchemy.sql import compiler
146 from sqlalchemy.sql import compiler
147 from psycopg2.extensions import adapt as sqlescape
147 from psycopg2.extensions import adapt as sqlescape
148
148
149
149
150 # or use the appropiate escape function from your db driver
150 # or use the appropiate escape function from your db driver
151
151
152 def compile_query(query):
152 def compile_query(query):
153 dialect = query.session.bind.dialect
153 dialect = query.session.bind.dialect
154 statement = query.statement
154 statement = query.statement
155 comp = compiler.SQLCompiler(dialect, statement)
155 comp = compiler.SQLCompiler(dialect, statement)
156 comp.compile()
156 comp.compile()
157 enc = dialect.encoding
157 enc = dialect.encoding
158 params = {}
158 params = {}
159 for k, v in comp.params.items():
159 for k, v in comp.params.items():
160 if isinstance(v, str):
160 if isinstance(v, str):
161 v = v.encode(enc)
161 v = v.encode(enc)
162 params[k] = sqlescape(v)
162 params[k] = sqlescape(v)
163 return (comp.string.encode(enc) % params).decode(enc)
163 return (comp.string.encode(enc) % params).decode(enc)
164
164
165
165
166 def convert_es_type(input_data):
166 def convert_es_type(input_data):
167 """
167 """
168 This might need to convert some text or other types to corresponding ES types
168 This might need to convert some text or other types to corresponding ES types
169 """
169 """
170 return str(input_data)
170 return str(input_data)
171
171
172
172
173 ProtoVersion = namedtuple('ProtoVersion', ['major', 'minor', 'patch'])
173 ProtoVersion = namedtuple('ProtoVersion', ['major', 'minor', 'patch'])
174
174
175
175
176 def parse_proto(input_data):
176 def parse_proto(input_data):
177 try:
177 try:
178 parts = [int(x) for x in input_data.split('.')]
178 parts = [int(x) for x in input_data.split('.')]
179 while len(parts) < 3:
179 while len(parts) < 3:
180 parts.append(0)
180 parts.append(0)
181 return ProtoVersion(*parts)
181 return ProtoVersion(*parts)
182 except Exception as e:
182 except Exception as e:
183 log.info('Unknown protocol version: %s' % e)
183 log.info('Unknown protocol version: %s' % e)
184 return ProtoVersion(99, 99, 99)
184 return ProtoVersion(99, 99, 99)
185
185
186
186
187 def es_index_name_limiter(start_date=None, end_date=None, months_in_past=6,
187 def es_index_name_limiter(start_date=None, end_date=None, months_in_past=6,
188 ixtypes=None):
188 ixtypes=None):
189 """
189 """
190 This function limits the search to 6 months by default so we don't have to
190 This function limits the search to 6 months by default so we don't have to
191 query 300 elasticsearch indices for 20 years of historical data for example
191 query 300 elasticsearch indices for 20 years of historical data for example
192 """
192 """
193
193
194 # should be cached later
194 # should be cached later
195 def get_possible_names():
195 def get_possible_names():
196 return list(Datastores.es.aliases().keys())
196 return list(Datastores.es.aliases().keys())
197
197
198 possible_names = get_possible_names()
198 possible_names = get_possible_names()
199 es_index_types = []
199 es_index_types = []
200 if not ixtypes:
200 if not ixtypes:
201 ixtypes = ['reports', 'metrics', 'logs']
201 ixtypes = ['reports', 'metrics', 'logs']
202 for t in ixtypes:
202 for t in ixtypes:
203 if t == 'reports':
203 if t == 'reports':
204 es_index_types.append('rcae_r_%s')
204 es_index_types.append('rcae_r_%s')
205 elif t == 'logs':
205 elif t == 'logs':
206 es_index_types.append('rcae_l_%s')
206 es_index_types.append('rcae_l_%s')
207 elif t == 'metrics':
207 elif t == 'metrics':
208 es_index_types.append('rcae_m_%s')
208 es_index_types.append('rcae_m_%s')
209 elif t == 'uptime':
209 elif t == 'uptime':
210 es_index_types.append('rcae_u_%s')
210 es_index_types.append('rcae_u_%s')
211 elif t == 'slow_calls':
211 elif t == 'slow_calls':
212 es_index_types.append('rcae_sc_%s')
212 es_index_types.append('rcae_sc_%s')
213
213
214 if start_date:
214 if start_date:
215 start_date = copy.copy(start_date)
215 start_date = copy.copy(start_date)
216 else:
216 else:
217 if not end_date:
217 if not end_date:
218 end_date = datetime.utcnow()
218 end_date = datetime.utcnow()
219 start_date = end_date + relativedelta(months=months_in_past * -1)
219 start_date = end_date + relativedelta(months=months_in_past * -1)
220
220
221 if not end_date:
221 if not end_date:
222 end_date = start_date + relativedelta(months=months_in_past)
222 end_date = start_date + relativedelta(months=months_in_past)
223
223
224 index_dates = list(rrule(MONTHLY,
224 index_dates = list(rrule(MONTHLY,
225 dtstart=start_date.date().replace(day=1),
225 dtstart=start_date.date().replace(day=1),
226 until=end_date.date(),
226 until=end_date.date(),
227 count=36))
227 count=36))
228 index_names = []
228 index_names = []
229 for ix_type in es_index_types:
229 for ix_type in es_index_types:
230 to_extend = [ix_type % d.strftime('%Y_%m') for d in index_dates
230 to_extend = [ix_type % d.strftime('%Y_%m') for d in index_dates
231 if ix_type % d.strftime('%Y_%m') in possible_names]
231 if ix_type % d.strftime('%Y_%m') in possible_names]
232 index_names.extend(to_extend)
232 index_names.extend(to_extend)
233 for day in list(rrule(DAILY, dtstart=start_date.date(),
233 for day in list(rrule(DAILY, dtstart=start_date.date(),
234 until=end_date.date(), count=366)):
234 until=end_date.date(), count=366)):
235 ix_name = ix_type % day.strftime('%Y_%m_%d')
235 ix_name = ix_type % day.strftime('%Y_%m_%d')
236 if ix_name in possible_names:
236 if ix_name in possible_names:
237 index_names.append(ix_name)
237 index_names.append(ix_name)
238 return index_names
238 return index_names
239
239
240
240
241 def build_filter_settings_from_query_dict(
241 def build_filter_settings_from_query_dict(
242 request, params=None, override_app_ids=None,
242 request, params=None, override_app_ids=None,
243 resource_permissions=None):
243 resource_permissions=None):
244 """
244 """
245 Builds list of normalized search terms for ES from query params
245 Builds list of normalized search terms for ES from query params
246 ensuring application list is restricted to only applications user
246 ensuring application list is restricted to only applications user
247 has access to
247 has access to
248
248
249 :param params (dictionary)
249 :param params (dictionary)
250 :param override_app_ids - list of application id's to use instead of
250 :param override_app_ids - list of application id's to use instead of
251 applications user normally has access to
251 applications user normally has access to
252 """
252 """
253 params = copy.deepcopy(params)
253 params = copy.deepcopy(params)
254 applications = []
254 applications = []
255 if not resource_permissions:
255 if not resource_permissions:
256 resource_permissions = ['view']
256 resource_permissions = ['view']
257
257
258 if request.user:
258 if request.user:
259 applications = request.user.resources_with_perms(
259 applications = request.user.resources_with_perms(
260 resource_permissions, resource_types=['application'])
260 resource_permissions, resource_types=['application'])
261
261
262 # CRITICAL - this ensures our resultset is limited to only the ones
262 # CRITICAL - this ensures our resultset is limited to only the ones
263 # user has view permissions
263 # user has view permissions
264 all_possible_app_ids = set([app.resource_id for app in applications])
264 all_possible_app_ids = set([app.resource_id for app in applications])
265
265
266 # if override is preset we force permission for app to be present
266 # if override is preset we force permission for app to be present
267 # this allows users to see dashboards and applications they would
267 # this allows users to see dashboards and applications they would
268 # normally not be able to
268 # normally not be able to
269
269
270 if override_app_ids:
270 if override_app_ids:
271 all_possible_app_ids = set(override_app_ids)
271 all_possible_app_ids = set(override_app_ids)
272
272
273 schema = LogSearchSchema().bind(resources=all_possible_app_ids)
273 schema = LogSearchSchema().bind(resources=all_possible_app_ids)
274 tag_schema = TagListSchema()
274 tag_schema = TagListSchema()
275 filter_settings = schema.deserialize(params)
275 filter_settings = schema.deserialize(params)
276 tag_list = []
276 tag_list = []
277 for k, v in list(filter_settings.items()):
277 for k, v in list(filter_settings.items()):
278 if k in accepted_search_params:
278 if k in accepted_search_params:
279 continue
279 continue
280 tag_list.append({"name": k, "value": v, "op": 'eq'})
280 tag_list.append({"name": k, "value": v, "op": 'eq'})
281 # remove the key from filter_settings
281 # remove the key from filter_settings
282 filter_settings.pop(k, None)
282 filter_settings.pop(k, None)
283 tags = tag_schema.deserialize(tag_list)
283 tags = tag_schema.deserialize(tag_list)
284 filter_settings['tags'] = tags
284 filter_settings['tags'] = tags
285 return filter_settings
285 return filter_settings
286
286
287
287
288 def gen_uuid():
288 def gen_uuid():
289 return str(uuid.uuid4())
289 return str(uuid.uuid4())
290
290
291
291
292 def gen_uuid4_sha_hex():
292 def gen_uuid4_sha_hex():
293 return hashlib.sha1(uuid.uuid4().bytes).hexdigest()
293 return hashlib.sha1(uuid.uuid4().bytes).hexdigest()
294
294
295
295
296 def permission_tuple_to_dict(data):
296 def permission_tuple_to_dict(data):
297 out = {
297 out = {
298 "user_name": None,
298 "user_name": None,
299 "perm_name": data.perm_name,
299 "perm_name": data.perm_name,
300 "owner": data.owner,
300 "owner": data.owner,
301 "type": data.type,
301 "type": data.type,
302 "resource_name": None,
302 "resource_name": None,
303 "resource_type": None,
303 "resource_type": None,
304 "resource_id": None,
304 "resource_id": None,
305 "group_name": None,
305 "group_name": None,
306 "group_id": None
306 "group_id": None
307 }
307 }
308 if data.user:
308 if data.user:
309 out["user_name"] = data.user.user_name
309 out["user_name"] = data.user.user_name
310 if data.perm_name == ALL_PERMISSIONS:
310 if data.perm_name == ALL_PERMISSIONS:
311 out['perm_name'] = '__all_permissions__'
311 out['perm_name'] = '__all_permissions__'
312 if data.resource:
312 if data.resource:
313 out['resource_name'] = data.resource.resource_name
313 out['resource_name'] = data.resource.resource_name
314 out['resource_type'] = data.resource.resource_type
314 out['resource_type'] = data.resource.resource_type
315 out['resource_id'] = data.resource.resource_id
315 out['resource_id'] = data.resource.resource_id
316 if data.group:
316 if data.group:
317 out['group_name'] = data.group.group_name
317 out['group_name'] = data.group.group_name
318 out['group_id'] = data.group.id
318 out['group_id'] = data.group.id
319 return out
319 return out
320
320
321
321
322 def get_cached_buckets(request, stats_since, end_time, fn, cache_key,
322 def get_cached_buckets(request, stats_since, end_time, fn, cache_key,
323 gap_gen=None, db_session=None, step_interval=None,
323 gap_gen=None, db_session=None, step_interval=None,
324 iv_extractor=None,
324 iv_extractor=None,
325 rerange=False, *args, **kwargs):
325 rerange=False, *args, **kwargs):
326 """ Takes "fn" that should return some data and tries to load the data
326 """ Takes "fn" that should return some data and tries to load the data
327 dividing it into daily buckets - if the stats_since and end time give a
327 dividing it into daily buckets - if the stats_since and end time give a
328 delta bigger than 24hours, then only "todays" data is computed on the fly
328 delta bigger than 24hours, then only "todays" data is computed on the fly
329
329
330 :param request: (request) request object
330 :param request: (request) request object
331 :param stats_since: (datetime) start date of buckets range
331 :param stats_since: (datetime) start date of buckets range
332 :param end_time: (datetime) end date of buckets range - utcnow() if None
332 :param end_time: (datetime) end date of buckets range - utcnow() if None
333 :param fn: (callable) callable to use to populate buckets should have
333 :param fn: (callable) callable to use to populate buckets should have
334 following signature:
334 following signature:
335 def get_data(request, since_when, until, *args, **kwargs):
335 def get_data(request, since_when, until, *args, **kwargs):
336
336
337 :param cache_key: (string) cache key that will be used to build bucket
337 :param cache_key: (string) cache key that will be used to build bucket
338 caches
338 caches
339 :param gap_gen: (callable) gap generator - should return step intervals
339 :param gap_gen: (callable) gap generator - should return step intervals
340 to use with out `fn` callable
340 to use with out `fn` callable
341 :param db_session: (Session) sqlalchemy session
341 :param db_session: (Session) sqlalchemy session
342 :param step_interval: (timedelta) optional step interval if we want to
342 :param step_interval: (timedelta) optional step interval if we want to
343 override the default determined from total start/end time delta
343 override the default determined from total start/end time delta
344 :param iv_extractor: (callable) used to get step intervals from data
344 :param iv_extractor: (callable) used to get step intervals from data
345 returned by `fn` callable
345 returned by `fn` callable
346 :param rerange: (bool) handy if we want to change ranges from hours to
346 :param rerange: (bool) handy if we want to change ranges from hours to
347 days when cached data is missing - will shorten execution time if `fn`
347 days when cached data is missing - will shorten execution time if `fn`
348 callable supports that and we are working with multiple rows - like metrics
348 callable supports that and we are working with multiple rows - like metrics
349 :param args:
349 :param args:
350 :param kwargs:
350 :param kwargs:
351
351
352 :return: iterable
352 :return: iterable
353 """
353 """
354 if not end_time:
354 if not end_time:
355 end_time = datetime.utcnow().replace(second=0, microsecond=0)
355 end_time = datetime.utcnow().replace(second=0, microsecond=0)
356 delta = end_time - stats_since
356 delta = end_time - stats_since
357 # if smaller than 3 days we want to group by 5min else by 1h,
357 # if smaller than 3 days we want to group by 5min else by 1h,
358 # for 60 min group by min
358 # for 60 min group by min
359 if not gap_gen:
359 if not gap_gen:
360 gap_gen = gap_gen_default
360 gap_gen = gap_gen_default
361 if not iv_extractor:
361 if not iv_extractor:
362 iv_extractor = default_extractor
362 iv_extractor = default_extractor
363
363
364 # do not use custom interval if total time range with new iv would exceed
364 # do not use custom interval if total time range with new iv would exceed
365 # end time
365 # end time
366 if not step_interval or stats_since + step_interval >= end_time:
366 if not step_interval or stats_since + step_interval >= end_time:
367 if delta < h.time_deltas.get('12h')['delta']:
367 if delta < h.time_deltas.get('12h')['delta']:
368 step_interval = timedelta(seconds=60)
368 step_interval = timedelta(seconds=60)
369 elif delta < h.time_deltas.get('3d')['delta']:
369 elif delta < h.time_deltas.get('3d')['delta']:
370 step_interval = timedelta(seconds=60 * 5)
370 step_interval = timedelta(seconds=60 * 5)
371 elif delta > h.time_deltas.get('2w')['delta']:
371 elif delta > h.time_deltas.get('2w')['delta']:
372 step_interval = timedelta(days=1)
372 step_interval = timedelta(days=1)
373 else:
373 else:
374 step_interval = timedelta(minutes=60)
374 step_interval = timedelta(minutes=60)
375
375
376 if step_interval >= timedelta(minutes=60):
376 if step_interval >= timedelta(minutes=60):
377 log.info('cached_buckets:{}: adjusting start time '
377 log.info('cached_buckets:{}: adjusting start time '
378 'for hourly or daily intervals'.format(cache_key))
378 'for hourly or daily intervals'.format(cache_key))
379 stats_since = stats_since.replace(hour=0, minute=0)
379 stats_since = stats_since.replace(hour=0, minute=0)
380
380
381 ranges = [i.start_interval for i in list(gap_gen(stats_since,
381 ranges = [i.start_interval for i in list(gap_gen(stats_since,
382 step_interval, [],
382 step_interval, [],
383 end_time=end_time))]
383 end_time=end_time))]
384 buckets = {}
384 buckets = {}
385 storage_key = 'buckets:' + cache_key + '{}|{}'
385 storage_key = 'buckets:' + cache_key + '{}|{}'
386 # this means we basicly cache per hour in 3-14 day intervals but i think
386 # this means we basicly cache per hour in 3-14 day intervals but i think
387 # its fine at this point - will be faster than db access anyways
387 # its fine at this point - will be faster than db access anyways
388
388
389 if len(ranges) >= 1:
389 if len(ranges) >= 1:
390 last_ranges = [ranges[-1]]
390 last_ranges = [ranges[-1]]
391 else:
391 else:
392 last_ranges = []
392 last_ranges = []
393 if step_interval >= timedelta(minutes=60):
393 if step_interval >= timedelta(minutes=60):
394 for r in ranges:
394 for r in ranges:
395 k = storage_key.format(step_interval.total_seconds(), r)
395 k = storage_key.format(step_interval.total_seconds(), r)
396 value = request.registry.cache_regions.redis_day_30.get(k)
396 value = request.registry.cache_regions.redis_day_30.get(k)
397 # last buckets are never loaded from cache
397 # last buckets are never loaded from cache
398 is_last_result = (
398 is_last_result = (
399 r >= end_time - timedelta(hours=6) or r in last_ranges)
399 r >= end_time - timedelta(hours=6) or r in last_ranges)
400 if value is not NO_VALUE and not is_last_result:
400 if value is not NO_VALUE and not is_last_result:
401 log.info("cached_buckets:{}: "
401 log.info("cached_buckets:{}: "
402 "loading range {} from cache".format(cache_key, r))
402 "loading range {} from cache".format(cache_key, r))
403 buckets[r] = value
403 buckets[r] = value
404 else:
404 else:
405 log.info("cached_buckets:{}: "
405 log.info("cached_buckets:{}: "
406 "loading range {} from storage".format(cache_key, r))
406 "loading range {} from storage".format(cache_key, r))
407 range_size = step_interval
407 range_size = step_interval
408 if (step_interval == timedelta(minutes=60) and
408 if (step_interval == timedelta(minutes=60) and
409 not is_last_result and rerange):
409 not is_last_result and rerange):
410 range_size = timedelta(days=1)
410 range_size = timedelta(days=1)
411 r = r.replace(hour=0, minute=0)
411 r = r.replace(hour=0, minute=0)
412 log.info("cached_buckets:{}: "
412 log.info("cached_buckets:{}: "
413 "loading collapsed "
413 "loading collapsed "
414 "range {} {}".format(cache_key, r,
414 "range {} {}".format(cache_key, r,
415 r + range_size))
415 r + range_size))
416 bucket_data = fn(
416 bucket_data = fn(
417 request, r, r + range_size, step_interval,
417 request, r, r + range_size, step_interval,
418 gap_gen, bucket_count=len(ranges), *args, **kwargs)
418 gap_gen, bucket_count=len(ranges), *args, **kwargs)
419 for b in bucket_data:
419 for b in bucket_data:
420 b_iv = iv_extractor(b)
420 b_iv = iv_extractor(b)
421 buckets[b_iv] = b
421 buckets[b_iv] = b
422 k2 = storage_key.format(
422 k2 = storage_key.format(
423 step_interval.total_seconds(), b_iv)
423 step_interval.total_seconds(), b_iv)
424 request.registry.cache_regions.redis_day_30.set(k2, b)
424 request.registry.cache_regions.redis_day_30.set(k2, b)
425 log.info("cached_buckets:{}: saving cache".format(cache_key))
425 log.info("cached_buckets:{}: saving cache".format(cache_key))
426 else:
426 else:
427 # bucket count is 1 for short time ranges <= 24h from now
427 # bucket count is 1 for short time ranges <= 24h from now
428 bucket_data = fn(request, stats_since, end_time, step_interval,
428 bucket_data = fn(request, stats_since, end_time, step_interval,
429 gap_gen, bucket_count=1, *args, **kwargs)
429 gap_gen, bucket_count=1, *args, **kwargs)
430 for b in bucket_data:
430 for b in bucket_data:
431 buckets[iv_extractor(b)] = b
431 buckets[iv_extractor(b)] = b
432 return buckets
432 return buckets
433
433
434
434
435 def get_cached_split_data(request, stats_since, end_time, fn, cache_key,
435 def get_cached_split_data(request, stats_since, end_time, fn, cache_key,
436 db_session=None, *args, **kwargs):
436 db_session=None, *args, **kwargs):
437 """ Takes "fn" that should return some data and tries to load the data
437 """ Takes "fn" that should return some data and tries to load the data
438 dividing it into 2 buckets - cached "since_from" bucket and "today"
438 dividing it into 2 buckets - cached "since_from" bucket and "today"
439 bucket - then the data can be reduced into single value
439 bucket - then the data can be reduced into single value
440
440
441 Data is cached if the stats_since and end time give a delta bigger
441 Data is cached if the stats_since and end time give a delta bigger
442 than 24hours - then only 24h is computed on the fly
442 than 24hours - then only 24h is computed on the fly
443 """
443 """
444 if not end_time:
444 if not end_time:
445 end_time = datetime.utcnow().replace(second=0, microsecond=0)
445 end_time = datetime.utcnow().replace(second=0, microsecond=0)
446 delta = end_time - stats_since
446 delta = end_time - stats_since
447
447
448 if delta >= timedelta(minutes=60):
448 if delta >= timedelta(minutes=60):
449 log.info('cached_split_data:{}: adjusting start time '
449 log.info('cached_split_data:{}: adjusting start time '
450 'for hourly or daily intervals'.format(cache_key))
450 'for hourly or daily intervals'.format(cache_key))
451 stats_since = stats_since.replace(hour=0, minute=0)
451 stats_since = stats_since.replace(hour=0, minute=0)
452
452
453 storage_key = 'buckets_split_data:' + cache_key + ':{}|{}'
453 storage_key = 'buckets_split_data:' + cache_key + ':{}|{}'
454 old_end_time = end_time.replace(hour=0, minute=0)
454 old_end_time = end_time.replace(hour=0, minute=0)
455
455
456 final_storage_key = storage_key.format(delta.total_seconds(),
456 final_storage_key = storage_key.format(delta.total_seconds(),
457 old_end_time)
457 old_end_time)
458 older_data = None
458 older_data = None
459
459
460 cdata = request.registry.cache_regions.redis_day_7.get(
460 cdata = request.registry.cache_regions.redis_day_7.get(
461 final_storage_key)
461 final_storage_key)
462
462
463 if cdata:
463 if cdata:
464 log.info("cached_split_data:{}: found old "
464 log.info("cached_split_data:{}: found old "
465 "bucket data".format(cache_key))
465 "bucket data".format(cache_key))
466 older_data = cdata
466 older_data = cdata
467
467
468 if (stats_since < end_time - h.time_deltas.get('24h')['delta'] and
468 if (stats_since < end_time - h.time_deltas.get('24h')['delta'] and
469 not cdata):
469 not cdata):
470 log.info("cached_split_data:{}: didn't find the "
470 log.info("cached_split_data:{}: didn't find the "
471 "start bucket in cache so load older data".format(cache_key))
471 "start bucket in cache so load older data".format(cache_key))
472 recent_stats_since = old_end_time
472 recent_stats_since = old_end_time
473 older_data = fn(request, stats_since, recent_stats_since,
473 older_data = fn(request, stats_since, recent_stats_since,
474 db_session=db_session, *args, **kwargs)
474 db_session=db_session, *args, **kwargs)
475 request.registry.cache_regions.redis_day_7.set(final_storage_key,
475 request.registry.cache_regions.redis_day_7.set(final_storage_key,
476 older_data)
476 older_data)
477 elif stats_since < end_time - h.time_deltas.get('24h')['delta']:
477 elif stats_since < end_time - h.time_deltas.get('24h')['delta']:
478 recent_stats_since = old_end_time
478 recent_stats_since = old_end_time
479 else:
479 else:
480 recent_stats_since = stats_since
480 recent_stats_since = stats_since
481
481
482 log.info("cached_split_data:{}: loading fresh "
482 log.info("cached_split_data:{}: loading fresh "
483 "data bucksts from last 24h ".format(cache_key))
483 "data bucksts from last 24h ".format(cache_key))
484 todays_data = fn(request, recent_stats_since, end_time,
484 todays_data = fn(request, recent_stats_since, end_time,
485 db_session=db_session, *args, **kwargs)
485 db_session=db_session, *args, **kwargs)
486 return older_data, todays_data
486 return older_data, todays_data
487
487
488
488
489 def in_batches(seq, size):
489 def in_batches(seq, size):
490 """
490 """
491 Splits am iterable into batches of specified size
491 Splits am iterable into batches of specified size
492 :param seq (iterable)
492 :param seq (iterable)
493 :param size integer
493 :param size integer
494 """
494 """
495 return (seq[pos:pos + size] for pos in range(0, len(seq), size))
495 return (seq[pos:pos + size] for pos in range(0, len(seq), size))
@@ -1,512 +1,511 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2010-2016 RhodeCode GmbH
3 # Copyright (C) 2010-2016 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # AppEnlight Enterprise Edition, including its added features, Support
18 # AppEnlight Enterprise Edition, including its added features, Support
19 # services, and proprietary license terms, please see
19 # services, and proprietary license terms, please see
20 # https://rhodecode.com/licenses/
20 # https://rhodecode.com/licenses/
21
21
22 from datetime import datetime
22 from datetime import datetime
23 import math
23 import math
24 import uuid
24 import uuid
25 import hashlib
25 import hashlib
26 import copy
26 import copy
27 import urllib.parse
27 import urllib.parse
28 import logging
28 import logging
29 import sqlalchemy as sa
29 import sqlalchemy as sa
30
30
31 from appenlight.models import Base, Datastores
31 from appenlight.models import Base, Datastores
32 from appenlight.lib.utils.date_utils import convert_date
32 from appenlight.lib.utils.date_utils import convert_date
33 from appenlight.lib.utils import convert_es_type
33 from appenlight.lib.utils import convert_es_type
34 from appenlight.models.slow_call import SlowCall
34 from appenlight.models.slow_call import SlowCall
35 from appenlight.lib.utils import cometd_request
35 from appenlight.lib.utils import channelstream_request
36 from appenlight.lib.enums import ReportType, Language
36 from appenlight.lib.enums import ReportType, Language
37 from pyramid.threadlocal import get_current_registry, get_current_request
37 from pyramid.threadlocal import get_current_registry, get_current_request
38 from sqlalchemy.dialects.postgresql import JSON
38 from sqlalchemy.dialects.postgresql import JSON
39 from ziggurat_foundations.models.base import BaseModel
39 from ziggurat_foundations.models.base import BaseModel
40
40
41 log = logging.getLogger(__name__)
41 log = logging.getLogger(__name__)
42
42
43 REPORT_TYPE_MATRIX = {
43 REPORT_TYPE_MATRIX = {
44 'http_status': {"type": 'int',
44 'http_status': {"type": 'int',
45 "ops": ('eq', 'ne', 'ge', 'le',)},
45 "ops": ('eq', 'ne', 'ge', 'le',)},
46 'group:priority': {"type": 'int',
46 'group:priority': {"type": 'int',
47 "ops": ('eq', 'ne', 'ge', 'le',)},
47 "ops": ('eq', 'ne', 'ge', 'le',)},
48 'duration': {"type": 'float',
48 'duration': {"type": 'float',
49 "ops": ('ge', 'le',)},
49 "ops": ('ge', 'le',)},
50 'url_domain': {"type": 'unicode',
50 'url_domain': {"type": 'unicode',
51 "ops": ('eq', 'ne', 'startswith', 'endswith', 'contains',)},
51 "ops": ('eq', 'ne', 'startswith', 'endswith', 'contains',)},
52 'url_path': {"type": 'unicode',
52 'url_path': {"type": 'unicode',
53 "ops": ('eq', 'ne', 'startswith', 'endswith', 'contains',)},
53 "ops": ('eq', 'ne', 'startswith', 'endswith', 'contains',)},
54 'error': {"type": 'unicode',
54 'error': {"type": 'unicode',
55 "ops": ('eq', 'ne', 'startswith', 'endswith', 'contains',)},
55 "ops": ('eq', 'ne', 'startswith', 'endswith', 'contains',)},
56 'tags:server_name': {"type": 'unicode',
56 'tags:server_name': {"type": 'unicode',
57 "ops": ('eq', 'ne', 'startswith', 'endswith',
57 "ops": ('eq', 'ne', 'startswith', 'endswith',
58 'contains',)},
58 'contains',)},
59 'traceback': {"type": 'unicode',
59 'traceback': {"type": 'unicode',
60 "ops": ('contains',)},
60 "ops": ('contains',)},
61 'group:occurences': {"type": 'int',
61 'group:occurences': {"type": 'int',
62 "ops": ('eq', 'ne', 'ge', 'le',)}
62 "ops": ('eq', 'ne', 'ge', 'le',)}
63 }
63 }
64
64
65
65
66 class Report(Base, BaseModel):
66 class Report(Base, BaseModel):
67 __tablename__ = 'reports'
67 __tablename__ = 'reports'
68 __table_args__ = {'implicit_returning': False}
68 __table_args__ = {'implicit_returning': False}
69
69
70 id = sa.Column(sa.Integer, nullable=False, primary_key=True)
70 id = sa.Column(sa.Integer, nullable=False, primary_key=True)
71 group_id = sa.Column(sa.BigInteger,
71 group_id = sa.Column(sa.BigInteger,
72 sa.ForeignKey('reports_groups.id', ondelete='cascade',
72 sa.ForeignKey('reports_groups.id', ondelete='cascade',
73 onupdate='cascade'))
73 onupdate='cascade'))
74 resource_id = sa.Column(sa.Integer(), nullable=False, index=True)
74 resource_id = sa.Column(sa.Integer(), nullable=False, index=True)
75 report_type = sa.Column(sa.Integer(), nullable=False, index=True)
75 report_type = sa.Column(sa.Integer(), nullable=False, index=True)
76 error = sa.Column(sa.UnicodeText(), index=True)
76 error = sa.Column(sa.UnicodeText(), index=True)
77 extra = sa.Column(JSON(), default={})
77 extra = sa.Column(JSON(), default={})
78 request = sa.Column(JSON(), nullable=False, default={})
78 request = sa.Column(JSON(), nullable=False, default={})
79 ip = sa.Column(sa.String(39), index=True, default='')
79 ip = sa.Column(sa.String(39), index=True, default='')
80 username = sa.Column(sa.Unicode(255), default='')
80 username = sa.Column(sa.Unicode(255), default='')
81 user_agent = sa.Column(sa.Unicode(255), default='')
81 user_agent = sa.Column(sa.Unicode(255), default='')
82 url = sa.Column(sa.UnicodeText(), index=True)
82 url = sa.Column(sa.UnicodeText(), index=True)
83 request_id = sa.Column(sa.Text())
83 request_id = sa.Column(sa.Text())
84 request_stats = sa.Column(JSON(), nullable=False, default={})
84 request_stats = sa.Column(JSON(), nullable=False, default={})
85 traceback = sa.Column(JSON(), nullable=False, default=None)
85 traceback = sa.Column(JSON(), nullable=False, default=None)
86 traceback_hash = sa.Column(sa.Text())
86 traceback_hash = sa.Column(sa.Text())
87 start_time = sa.Column(sa.DateTime(), default=datetime.utcnow,
87 start_time = sa.Column(sa.DateTime(), default=datetime.utcnow,
88 server_default=sa.func.now())
88 server_default=sa.func.now())
89 end_time = sa.Column(sa.DateTime())
89 end_time = sa.Column(sa.DateTime())
90 duration = sa.Column(sa.Float, default=0)
90 duration = sa.Column(sa.Float, default=0)
91 http_status = sa.Column(sa.Integer, index=True)
91 http_status = sa.Column(sa.Integer, index=True)
92 url_domain = sa.Column(sa.Unicode(100), index=True)
92 url_domain = sa.Column(sa.Unicode(100), index=True)
93 url_path = sa.Column(sa.Unicode(255), index=True)
93 url_path = sa.Column(sa.Unicode(255), index=True)
94 tags = sa.Column(JSON(), nullable=False, default={})
94 tags = sa.Column(JSON(), nullable=False, default={})
95 language = sa.Column(sa.Integer(), default=0)
95 language = sa.Column(sa.Integer(), default=0)
96 # this is used to determine partition for the report
96 # this is used to determine partition for the report
97 report_group_time = sa.Column(sa.DateTime(), default=datetime.utcnow,
97 report_group_time = sa.Column(sa.DateTime(), default=datetime.utcnow,
98 server_default=sa.func.now())
98 server_default=sa.func.now())
99
99
100 logs = sa.orm.relationship(
100 logs = sa.orm.relationship(
101 'Log',
101 'Log',
102 lazy='dynamic',
102 lazy='dynamic',
103 passive_deletes=True,
103 passive_deletes=True,
104 passive_updates=True,
104 passive_updates=True,
105 primaryjoin="and_(Report.request_id==Log.request_id, "
105 primaryjoin="and_(Report.request_id==Log.request_id, "
106 "Log.request_id != None, Log.request_id != '')",
106 "Log.request_id != None, Log.request_id != '')",
107 foreign_keys='[Log.request_id]')
107 foreign_keys='[Log.request_id]')
108
108
109 slow_calls = sa.orm.relationship('SlowCall',
109 slow_calls = sa.orm.relationship('SlowCall',
110 backref='detail',
110 backref='detail',
111 cascade="all, delete-orphan",
111 cascade="all, delete-orphan",
112 passive_deletes=True,
112 passive_deletes=True,
113 passive_updates=True,
113 passive_updates=True,
114 order_by='SlowCall.timestamp')
114 order_by='SlowCall.timestamp')
115
115
116 def set_data(self, data, resource, protocol_version=None):
116 def set_data(self, data, resource, protocol_version=None):
117 self.http_status = data['http_status']
117 self.http_status = data['http_status']
118 self.priority = data['priority']
118 self.priority = data['priority']
119 self.error = data['error']
119 self.error = data['error']
120 report_language = data.get('language', '').lower()
120 report_language = data.get('language', '').lower()
121 self.language = getattr(Language, report_language, Language.unknown)
121 self.language = getattr(Language, report_language, Language.unknown)
122 # we need temp holder here to decide later
122 # we need temp holder here to decide later
123 # if we want to to commit the tags if report is marked for creation
123 # if we want to to commit the tags if report is marked for creation
124 self.tags = {
124 self.tags = {
125 'server_name': data['server'],
125 'server_name': data['server'],
126 'view_name': data['view_name']
126 'view_name': data['view_name']
127 }
127 }
128 if data.get('tags'):
128 if data.get('tags'):
129 for tag_tuple in data['tags']:
129 for tag_tuple in data['tags']:
130 self.tags[tag_tuple[0]] = tag_tuple[1]
130 self.tags[tag_tuple[0]] = tag_tuple[1]
131 self.traceback = data['traceback']
131 self.traceback = data['traceback']
132 stripped_traceback = self.stripped_traceback()
132 stripped_traceback = self.stripped_traceback()
133 tb_repr = repr(stripped_traceback).encode('utf8')
133 tb_repr = repr(stripped_traceback).encode('utf8')
134 self.traceback_hash = hashlib.sha1(tb_repr).hexdigest()
134 self.traceback_hash = hashlib.sha1(tb_repr).hexdigest()
135 url_info = urllib.parse.urlsplit(
135 url_info = urllib.parse.urlsplit(
136 data.get('url', ''), allow_fragments=False)
136 data.get('url', ''), allow_fragments=False)
137 self.url_domain = url_info.netloc[:128]
137 self.url_domain = url_info.netloc[:128]
138 self.url_path = url_info.path[:2048]
138 self.url_path = url_info.path[:2048]
139 self.occurences = data['occurences']
139 self.occurences = data['occurences']
140 if self.error:
140 if self.error:
141 self.report_type = ReportType.error
141 self.report_type = ReportType.error
142 else:
142 else:
143 self.report_type = ReportType.slow
143 self.report_type = ReportType.slow
144
144
145 # but if its status 404 its 404 type
145 # but if its status 404 its 404 type
146 if self.http_status in [404, '404'] or self.error == '404 Not Found':
146 if self.http_status in [404, '404'] or self.error == '404 Not Found':
147 self.report_type = ReportType.not_found
147 self.report_type = ReportType.not_found
148 self.error = ''
148 self.error = ''
149
149
150 self.generate_grouping_hash(data.get('appenlight.group_string',
150 self.generate_grouping_hash(data.get('appenlight.group_string',
151 data.get('group_string')),
151 data.get('group_string')),
152 resource.default_grouping,
152 resource.default_grouping,
153 protocol_version)
153 protocol_version)
154
154
155 # details
155 # details
156 if data['http_status'] in [404, '404']:
156 if data['http_status'] in [404, '404']:
157 data = {"username": data["username"],
157 data = {"username": data["username"],
158 "ip": data["ip"],
158 "ip": data["ip"],
159 "url": data["url"],
159 "url": data["url"],
160 "user_agent": data["user_agent"]}
160 "user_agent": data["user_agent"]}
161 if data.get('HTTP_REFERER') or data.get('http_referer'):
161 if data.get('HTTP_REFERER') or data.get('http_referer'):
162 data['HTTP_REFERER'] = data.get(
162 data['HTTP_REFERER'] = data.get(
163 'HTTP_REFERER', '') or data.get('http_referer', '')
163 'HTTP_REFERER', '') or data.get('http_referer', '')
164
164
165 self.resource_id = resource.resource_id
165 self.resource_id = resource.resource_id
166 self.username = data['username']
166 self.username = data['username']
167 self.user_agent = data['user_agent']
167 self.user_agent = data['user_agent']
168 self.ip = data['ip']
168 self.ip = data['ip']
169 self.extra = {}
169 self.extra = {}
170 if data.get('extra'):
170 if data.get('extra'):
171 for extra_tuple in data['extra']:
171 for extra_tuple in data['extra']:
172 self.extra[extra_tuple[0]] = extra_tuple[1]
172 self.extra[extra_tuple[0]] = extra_tuple[1]
173
173
174 self.url = data['url']
174 self.url = data['url']
175 self.request_id = data.get('request_id', '').replace('-', '') or str(
175 self.request_id = data.get('request_id', '').replace('-', '') or str(
176 uuid.uuid4())
176 uuid.uuid4())
177 request_data = data.get('request', {})
177 request_data = data.get('request', {})
178
178
179 self.request = request_data
179 self.request = request_data
180 self.request_stats = data.get('request_stats', {})
180 self.request_stats = data.get('request_stats', {})
181 traceback = data.get('traceback')
181 traceback = data.get('traceback')
182 if not traceback:
182 if not traceback:
183 traceback = data.get('frameinfo')
183 traceback = data.get('frameinfo')
184 self.traceback = traceback
184 self.traceback = traceback
185 start_date = convert_date(data.get('start_time'))
185 start_date = convert_date(data.get('start_time'))
186 if not self.start_time or self.start_time < start_date:
186 if not self.start_time or self.start_time < start_date:
187 self.start_time = start_date
187 self.start_time = start_date
188
188
189 self.end_time = convert_date(data.get('end_time'), False)
189 self.end_time = convert_date(data.get('end_time'), False)
190 self.duration = 0
190 self.duration = 0
191
191
192 if self.start_time and self.end_time:
192 if self.start_time and self.end_time:
193 d = self.end_time - self.start_time
193 d = self.end_time - self.start_time
194 self.duration = d.total_seconds()
194 self.duration = d.total_seconds()
195
195
196 # update tags with other vars
196 # update tags with other vars
197 if self.username:
197 if self.username:
198 self.tags['user_name'] = self.username
198 self.tags['user_name'] = self.username
199 self.tags['report_language'] = Language.key_from_value(self.language)
199 self.tags['report_language'] = Language.key_from_value(self.language)
200
200
201 def add_slow_calls(self, data, report_group):
201 def add_slow_calls(self, data, report_group):
202 slow_calls = []
202 slow_calls = []
203 for call in data.get('slow_calls', []):
203 for call in data.get('slow_calls', []):
204 sc_inst = SlowCall()
204 sc_inst = SlowCall()
205 sc_inst.set_data(call, resource_id=self.resource_id,
205 sc_inst.set_data(call, resource_id=self.resource_id,
206 report_group=report_group)
206 report_group=report_group)
207 slow_calls.append(sc_inst)
207 slow_calls.append(sc_inst)
208 self.slow_calls.extend(slow_calls)
208 self.slow_calls.extend(slow_calls)
209 return slow_calls
209 return slow_calls
210
210
211 def get_dict(self, request, details=False, exclude_keys=None,
211 def get_dict(self, request, details=False, exclude_keys=None,
212 include_keys=None):
212 include_keys=None):
213 from appenlight.models.services.report_group import ReportGroupService
213 from appenlight.models.services.report_group import ReportGroupService
214 instance_dict = super(Report, self).get_dict()
214 instance_dict = super(Report, self).get_dict()
215 instance_dict['req_stats'] = self.req_stats()
215 instance_dict['req_stats'] = self.req_stats()
216 instance_dict['group'] = {}
216 instance_dict['group'] = {}
217 instance_dict['group']['id'] = self.report_group.id
217 instance_dict['group']['id'] = self.report_group.id
218 instance_dict['group'][
218 instance_dict['group'][
219 'total_reports'] = self.report_group.total_reports
219 'total_reports'] = self.report_group.total_reports
220 instance_dict['group']['last_report'] = self.report_group.last_report
220 instance_dict['group']['last_report'] = self.report_group.last_report
221 instance_dict['group']['priority'] = self.report_group.priority
221 instance_dict['group']['priority'] = self.report_group.priority
222 instance_dict['group']['occurences'] = self.report_group.occurences
222 instance_dict['group']['occurences'] = self.report_group.occurences
223 instance_dict['group'][
223 instance_dict['group'][
224 'last_timestamp'] = self.report_group.last_timestamp
224 'last_timestamp'] = self.report_group.last_timestamp
225 instance_dict['group'][
225 instance_dict['group'][
226 'first_timestamp'] = self.report_group.first_timestamp
226 'first_timestamp'] = self.report_group.first_timestamp
227 instance_dict['group']['public'] = self.report_group.public
227 instance_dict['group']['public'] = self.report_group.public
228 instance_dict['group']['fixed'] = self.report_group.fixed
228 instance_dict['group']['fixed'] = self.report_group.fixed
229 instance_dict['group']['read'] = self.report_group.read
229 instance_dict['group']['read'] = self.report_group.read
230 instance_dict['group'][
230 instance_dict['group'][
231 'average_duration'] = self.report_group.average_duration
231 'average_duration'] = self.report_group.average_duration
232
232
233 instance_dict[
233 instance_dict[
234 'resource_name'] = self.report_group.application.resource_name
234 'resource_name'] = self.report_group.application.resource_name
235 instance_dict['report_type'] = self.report_type
235 instance_dict['report_type'] = self.report_type
236
236
237 if instance_dict['http_status'] == 404 and not instance_dict['error']:
237 if instance_dict['http_status'] == 404 and not instance_dict['error']:
238 instance_dict['error'] = '404 Not Found'
238 instance_dict['error'] = '404 Not Found'
239
239
240 if details:
240 if details:
241 instance_dict['affected_users_count'] = \
241 instance_dict['affected_users_count'] = \
242 ReportGroupService.affected_users_count(self.report_group)
242 ReportGroupService.affected_users_count(self.report_group)
243 instance_dict['top_affected_users'] = [
243 instance_dict['top_affected_users'] = [
244 {'username': u.username, 'count': u.count} for u in
244 {'username': u.username, 'count': u.count} for u in
245 ReportGroupService.top_affected_users(self.report_group)]
245 ReportGroupService.top_affected_users(self.report_group)]
246 instance_dict['application'] = {'integrations': []}
246 instance_dict['application'] = {'integrations': []}
247 for integration in self.report_group.application.integrations:
247 for integration in self.report_group.application.integrations:
248 if integration.front_visible:
248 if integration.front_visible:
249 instance_dict['application']['integrations'].append(
249 instance_dict['application']['integrations'].append(
250 {'name': integration.integration_name,
250 {'name': integration.integration_name,
251 'action': integration.integration_action})
251 'action': integration.integration_action})
252 instance_dict['comments'] = [c.get_dict() for c in
252 instance_dict['comments'] = [c.get_dict() for c in
253 self.report_group.comments]
253 self.report_group.comments]
254
254
255 instance_dict['group']['next_report'] = None
255 instance_dict['group']['next_report'] = None
256 instance_dict['group']['previous_report'] = None
256 instance_dict['group']['previous_report'] = None
257 next_in_group = self.get_next_in_group(request)
257 next_in_group = self.get_next_in_group(request)
258 previous_in_group = self.get_previous_in_group(request)
258 previous_in_group = self.get_previous_in_group(request)
259 if next_in_group:
259 if next_in_group:
260 instance_dict['group']['next_report'] = next_in_group
260 instance_dict['group']['next_report'] = next_in_group
261 if previous_in_group:
261 if previous_in_group:
262 instance_dict['group']['previous_report'] = previous_in_group
262 instance_dict['group']['previous_report'] = previous_in_group
263
263
264 # slow call ordering
264 # slow call ordering
265 def find_parent(row, data):
265 def find_parent(row, data):
266 for r in reversed(data):
266 for r in reversed(data):
267 try:
267 try:
268 if (row['timestamp'] > r['timestamp'] and
268 if (row['timestamp'] > r['timestamp'] and
269 row['end_time'] < r['end_time']):
269 row['end_time'] < r['end_time']):
270 return r
270 return r
271 except TypeError as e:
271 except TypeError as e:
272 log.warning('reports_view.find_parent: %s' % e)
272 log.warning('reports_view.find_parent: %s' % e)
273 return None
273 return None
274
274
275 new_calls = []
275 new_calls = []
276 calls = [c.get_dict() for c in self.slow_calls]
276 calls = [c.get_dict() for c in self.slow_calls]
277 while calls:
277 while calls:
278 # start from end
278 # start from end
279 for x in range(len(calls) - 1, -1, -1):
279 for x in range(len(calls) - 1, -1, -1):
280 parent = find_parent(calls[x], calls)
280 parent = find_parent(calls[x], calls)
281 if parent:
281 if parent:
282 parent['children'].append(calls[x])
282 parent['children'].append(calls[x])
283 else:
283 else:
284 # no parent at all? append to new calls anyways
284 # no parent at all? append to new calls anyways
285 new_calls.append(calls[x])
285 new_calls.append(calls[x])
286 # print 'append', calls[x]
286 # print 'append', calls[x]
287 del calls[x]
287 del calls[x]
288 break
288 break
289 instance_dict['slow_calls'] = new_calls
289 instance_dict['slow_calls'] = new_calls
290
290
291 instance_dict['front_url'] = self.get_public_url(request)
291 instance_dict['front_url'] = self.get_public_url(request)
292
292
293 exclude_keys_list = exclude_keys or []
293 exclude_keys_list = exclude_keys or []
294 include_keys_list = include_keys or []
294 include_keys_list = include_keys or []
295 for k in list(instance_dict.keys()):
295 for k in list(instance_dict.keys()):
296 if k == 'group':
296 if k == 'group':
297 continue
297 continue
298 if (k in exclude_keys_list or
298 if (k in exclude_keys_list or
299 (k not in include_keys_list and include_keys)):
299 (k not in include_keys_list and include_keys)):
300 del instance_dict[k]
300 del instance_dict[k]
301 return instance_dict
301 return instance_dict
302
302
303 def get_previous_in_group(self, request):
303 def get_previous_in_group(self, request):
304 query = {
304 query = {
305 "size": 1,
305 "size": 1,
306 "query": {
306 "query": {
307 "filtered": {
307 "filtered": {
308 "filter": {
308 "filter": {
309 "and": [{"term": {"group_id": self.group_id}},
309 "and": [{"term": {"group_id": self.group_id}},
310 {"range": {"pg_id": {"lt": self.id}}}]
310 {"range": {"pg_id": {"lt": self.id}}}]
311 }
311 }
312 }
312 }
313 },
313 },
314 "sort": [
314 "sort": [
315 {"_doc": {"order": "desc"}},
315 {"_doc": {"order": "desc"}},
316 ],
316 ],
317 }
317 }
318 result = request.es_conn.search(query, index=self.partition_id,
318 result = request.es_conn.search(query, index=self.partition_id,
319 doc_type='report')
319 doc_type='report')
320 if result['hits']['total']:
320 if result['hits']['total']:
321 return result['hits']['hits'][0]['_source']['pg_id']
321 return result['hits']['hits'][0]['_source']['pg_id']
322
322
323 def get_next_in_group(self, request):
323 def get_next_in_group(self, request):
324 query = {
324 query = {
325 "size": 1,
325 "size": 1,
326 "query": {
326 "query": {
327 "filtered": {
327 "filtered": {
328 "filter": {
328 "filter": {
329 "and": [{"term": {"group_id": self.group_id}},
329 "and": [{"term": {"group_id": self.group_id}},
330 {"range": {"pg_id": {"gt": self.id}}}]
330 {"range": {"pg_id": {"gt": self.id}}}]
331 }
331 }
332 }
332 }
333 },
333 },
334 "sort": [
334 "sort": [
335 {"_doc": {"order": "asc"}},
335 {"_doc": {"order": "asc"}},
336 ],
336 ],
337 }
337 }
338 result = request.es_conn.search(query, index=self.partition_id,
338 result = request.es_conn.search(query, index=self.partition_id,
339 doc_type='report')
339 doc_type='report')
340 if result['hits']['total']:
340 if result['hits']['total']:
341 return result['hits']['hits'][0]['_source']['pg_id']
341 return result['hits']['hits'][0]['_source']['pg_id']
342
342
343 def get_public_url(self, request=None, report_group=None, _app_url=None):
343 def get_public_url(self, request=None, report_group=None, _app_url=None):
344 """
344 """
345 Returns url that user can use to visit specific report
345 Returns url that user can use to visit specific report
346 """
346 """
347 if not request:
347 if not request:
348 request = get_current_request()
348 request = get_current_request()
349 url = request.route_url('/', _app_url=_app_url)
349 url = request.route_url('/', _app_url=_app_url)
350 if report_group:
350 if report_group:
351 return (url + 'ui/report/%s/%s') % (report_group.id, self.id)
351 return (url + 'ui/report/%s/%s') % (report_group.id, self.id)
352 return (url + 'ui/report/%s/%s') % (self.group_id, self.id)
352 return (url + 'ui/report/%s/%s') % (self.group_id, self.id)
353
353
354 def req_stats(self):
354 def req_stats(self):
355 stats = self.request_stats.copy()
355 stats = self.request_stats.copy()
356 stats['percentages'] = {}
356 stats['percentages'] = {}
357 stats['percentages']['main'] = 100.0
357 stats['percentages']['main'] = 100.0
358 main = stats.get('main', 0.0)
358 main = stats.get('main', 0.0)
359 if not main:
359 if not main:
360 return None
360 return None
361 for name, call_time in stats.items():
361 for name, call_time in stats.items():
362 if ('calls' not in name and 'main' not in name and
362 if ('calls' not in name and 'main' not in name and
363 'percentages' not in name):
363 'percentages' not in name):
364 stats['main'] -= call_time
364 stats['main'] -= call_time
365 stats['percentages'][name] = math.floor(
365 stats['percentages'][name] = math.floor(
366 (call_time / main * 100.0))
366 (call_time / main * 100.0))
367 stats['percentages']['main'] -= stats['percentages'][name]
367 stats['percentages']['main'] -= stats['percentages'][name]
368 if stats['percentages']['main'] < 0.0:
368 if stats['percentages']['main'] < 0.0:
369 stats['percentages']['main'] = 0.0
369 stats['percentages']['main'] = 0.0
370 stats['main'] = 0.0
370 stats['main'] = 0.0
371 return stats
371 return stats
372
372
373 def generate_grouping_hash(self, hash_string=None, default_grouping=None,
373 def generate_grouping_hash(self, hash_string=None, default_grouping=None,
374 protocol_version=None):
374 protocol_version=None):
375 """
375 """
376 Generates SHA1 hash that will be used to group reports together
376 Generates SHA1 hash that will be used to group reports together
377 """
377 """
378 if not hash_string:
378 if not hash_string:
379 location = self.tags.get('view_name') or self.url_path;
379 location = self.tags.get('view_name') or self.url_path;
380 server_name = self.tags.get('server_name') or ''
380 server_name = self.tags.get('server_name') or ''
381 if default_grouping == 'url_traceback':
381 if default_grouping == 'url_traceback':
382 hash_string = '%s_%s_%s' % (self.traceback_hash, location,
382 hash_string = '%s_%s_%s' % (self.traceback_hash, location,
383 self.error)
383 self.error)
384 if self.language == Language.javascript:
384 if self.language == Language.javascript:
385 hash_string = '%s_%s' % (self.traceback_hash, self.error)
385 hash_string = '%s_%s' % (self.traceback_hash, self.error)
386
386
387 elif default_grouping == 'traceback_server':
387 elif default_grouping == 'traceback_server':
388 hash_string = '%s_%s' % (self.traceback_hash, server_name)
388 hash_string = '%s_%s' % (self.traceback_hash, server_name)
389 if self.language == Language.javascript:
389 if self.language == Language.javascript:
390 hash_string = '%s_%s' % (self.traceback_hash, server_name)
390 hash_string = '%s_%s' % (self.traceback_hash, server_name)
391 else:
391 else:
392 hash_string = '%s_%s' % (self.error, location)
392 hash_string = '%s_%s' % (self.error, location)
393 binary_string = hash_string.encode('utf8')
393 binary_string = hash_string.encode('utf8')
394 self.grouping_hash = hashlib.sha1(binary_string).hexdigest()
394 self.grouping_hash = hashlib.sha1(binary_string).hexdigest()
395 return self.grouping_hash
395 return self.grouping_hash
396
396
397 def stripped_traceback(self):
397 def stripped_traceback(self):
398 """
398 """
399 Traceback without local vars
399 Traceback without local vars
400 """
400 """
401 stripped_traceback = copy.deepcopy(self.traceback)
401 stripped_traceback = copy.deepcopy(self.traceback)
402
402
403 if isinstance(stripped_traceback, list):
403 if isinstance(stripped_traceback, list):
404 for row in stripped_traceback:
404 for row in stripped_traceback:
405 row.pop('vars', None)
405 row.pop('vars', None)
406 return stripped_traceback
406 return stripped_traceback
407
407
408 def notify_channel(self, report_group):
408 def notify_channel(self, report_group):
409 """
409 """
410 Sends notification to websocket channel
410 Sends notification to websocket channel
411 """
411 """
412 settings = get_current_registry().settings
412 settings = get_current_registry().settings
413 log.info('notify cometd')
413 log.info('notify channelstream')
414 if self.report_type != ReportType.error:
414 if self.report_type != ReportType.error:
415 return
415 return
416 payload = {
416 payload = {
417 'type': 'message',
417 'type': 'message',
418 "user": '__system__',
418 "user": '__system__',
419 "channel": 'app_%s' % self.resource_id,
419 "channel": 'app_%s' % self.resource_id,
420 'message': {
420 'message': {
421 'type': 'report',
421 'topic': 'front_dashboard.new_topic',
422 'report': {
422 'report': {
423 'group': {
423 'group': {
424 'priority': report_group.priority,
424 'priority': report_group.priority,
425 'first_timestamp': report_group.first_timestamp,
425 'first_timestamp': report_group.first_timestamp,
426 'last_timestamp': report_group.last_timestamp,
426 'last_timestamp': report_group.last_timestamp,
427 'average_duration': report_group.average_duration,
427 'average_duration': report_group.average_duration,
428 'occurences': report_group.occurences
428 'occurences': report_group.occurences
429 },
429 },
430 'report_id': self.id,
430 'report_id': self.id,
431 'group_id': self.group_id,
431 'group_id': self.group_id,
432 'resource_id': self.resource_id,
432 'resource_id': self.resource_id,
433 'http_status': self.http_status,
433 'http_status': self.http_status,
434 'url_domain': self.url_domain,
434 'url_domain': self.url_domain,
435 'url_path': self.url_path,
435 'url_path': self.url_path,
436 'error': self.error or '',
436 'error': self.error or '',
437 'server': self.tags.get('server_name'),
437 'server': self.tags.get('server_name'),
438 'view_name': self.tags.get('view_name'),
438 'view_name': self.tags.get('view_name'),
439 'front_url': self.get_public_url(),
439 'front_url': self.get_public_url(),
440 }
440 }
441 }
441 }
442
442
443 }
443 }
444
444 channelstream_request(settings['cometd.secret'], '/message', [payload],
445 cometd_request(settings['cometd.secret'], '/message', [payload],
446 servers=[settings['cometd_servers']])
445 servers=[settings['cometd_servers']])
447
446
448 def es_doc(self):
447 def es_doc(self):
449 tags = {}
448 tags = {}
450 tag_list = []
449 tag_list = []
451 for name, value in self.tags.items():
450 for name, value in self.tags.items():
452 name = name.replace('.', '_')
451 name = name.replace('.', '_')
453 tag_list.append(name)
452 tag_list.append(name)
454 tags[name] = {
453 tags[name] = {
455 "values": convert_es_type(value),
454 "values": convert_es_type(value),
456 "numeric_values": value if (
455 "numeric_values": value if (
457 isinstance(value, (int, float)) and
456 isinstance(value, (int, float)) and
458 not isinstance(value, bool)) else None}
457 not isinstance(value, bool)) else None}
459
458
460 if 'user_name' not in self.tags and self.username:
459 if 'user_name' not in self.tags and self.username:
461 tags["user_name"] = {"value": [self.username],
460 tags["user_name"] = {"value": [self.username],
462 "numeric_value": None}
461 "numeric_value": None}
463 return {
462 return {
464 '_id': str(self.id),
463 '_id': str(self.id),
465 'pg_id': str(self.id),
464 'pg_id': str(self.id),
466 'resource_id': self.resource_id,
465 'resource_id': self.resource_id,
467 'http_status': self.http_status or '',
466 'http_status': self.http_status or '',
468 'start_time': self.start_time,
467 'start_time': self.start_time,
469 'end_time': self.end_time,
468 'end_time': self.end_time,
470 'url_domain': self.url_domain if self.url_domain else '',
469 'url_domain': self.url_domain if self.url_domain else '',
471 'url_path': self.url_path if self.url_path else '',
470 'url_path': self.url_path if self.url_path else '',
472 'duration': self.duration,
471 'duration': self.duration,
473 'error': self.error if self.error else '',
472 'error': self.error if self.error else '',
474 'report_type': self.report_type,
473 'report_type': self.report_type,
475 'request_id': self.request_id,
474 'request_id': self.request_id,
476 'ip': self.ip,
475 'ip': self.ip,
477 'group_id': str(self.group_id),
476 'group_id': str(self.group_id),
478 '_parent': str(self.group_id),
477 '_parent': str(self.group_id),
479 'tags': tags,
478 'tags': tags,
480 'tag_list': tag_list
479 'tag_list': tag_list
481 }
480 }
482
481
483 @property
482 @property
484 def partition_id(self):
483 def partition_id(self):
485 return 'rcae_r_%s' % self.report_group_time.strftime('%Y_%m')
484 return 'rcae_r_%s' % self.report_group_time.strftime('%Y_%m')
486
485
487
486
488 def after_insert(mapper, connection, target):
487 def after_insert(mapper, connection, target):
489 if not hasattr(target, '_skip_ft_index'):
488 if not hasattr(target, '_skip_ft_index'):
490 data = target.es_doc()
489 data = target.es_doc()
491 data.pop('_id', None)
490 data.pop('_id', None)
492 Datastores.es.index(target.partition_id, 'report', data,
491 Datastores.es.index(target.partition_id, 'report', data,
493 parent=target.group_id, id=target.id)
492 parent=target.group_id, id=target.id)
494
493
495
494
496 def after_update(mapper, connection, target):
495 def after_update(mapper, connection, target):
497 if not hasattr(target, '_skip_ft_index'):
496 if not hasattr(target, '_skip_ft_index'):
498 data = target.es_doc()
497 data = target.es_doc()
499 data.pop('_id', None)
498 data.pop('_id', None)
500 Datastores.es.index(target.partition_id, 'report', data,
499 Datastores.es.index(target.partition_id, 'report', data,
501 parent=target.group_id, id=target.id)
500 parent=target.group_id, id=target.id)
502
501
503
502
504 def after_delete(mapper, connection, target):
503 def after_delete(mapper, connection, target):
505 if not hasattr(target, '_skip_ft_index'):
504 if not hasattr(target, '_skip_ft_index'):
506 query = {'term': {'pg_id': target.id}}
505 query = {'term': {'pg_id': target.id}}
507 Datastores.es.delete_by_query(target.partition_id, 'report', query)
506 Datastores.es.delete_by_query(target.partition_id, 'report', query)
508
507
509
508
510 sa.event.listen(Report, 'after_insert', after_insert)
509 sa.event.listen(Report, 'after_insert', after_insert)
511 sa.event.listen(Report, 'after_update', after_update)
510 sa.event.listen(Report, 'after_update', after_update)
512 sa.event.listen(Report, 'after_delete', after_delete)
511 sa.event.listen(Report, 'after_delete', after_delete)
1 NO CONTENT: modified file
NO CONTENT: modified file
The requested commit or file is too big and content was truncated. Show full diff
1 NO CONTENT: modified file
NO CONTENT: modified file
The requested commit or file is too big and content was truncated. Show full diff
1 NO CONTENT: modified file
NO CONTENT: modified file
The requested commit or file is too big and content was truncated. Show full diff
1 NO CONTENT: modified file
NO CONTENT: modified file
The requested commit or file is too big and content was truncated. Show full diff
1 NO CONTENT: modified file
NO CONTENT: modified file
The requested commit or file is too big and content was truncated. Show full diff
1 NO CONTENT: modified file
NO CONTENT: modified file
The requested commit or file is too big and content was truncated. Show full diff
General Comments 0
You need to be logged in to leave comments. Login now