##// END OF EJS Templates
reports: fetch previous/next from elasticsearch
ergo -
Show More
@@ -1,489 +1,512 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2016 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # AppEnlight Enterprise Edition, including its added features, Support
19 19 # services, and proprietary license terms, please see
20 20 # https://rhodecode.com/licenses/
21 21
22 22 from datetime import datetime
23 23 import math
24 24 import uuid
25 25 import hashlib
26 26 import copy
27 27 import urllib.parse
28 28 import logging
29 29 import sqlalchemy as sa
30 30
31 31 from appenlight.models import Base, Datastores
32 32 from appenlight.lib.utils.date_utils import convert_date
33 33 from appenlight.lib.utils import convert_es_type
34 34 from appenlight.models.slow_call import SlowCall
35 35 from appenlight.lib.utils import cometd_request
36 36 from appenlight.lib.enums import ReportType, Language
37 37 from pyramid.threadlocal import get_current_registry, get_current_request
38 38 from sqlalchemy.dialects.postgresql import JSON
39 39 from ziggurat_foundations.models.base import BaseModel
40 40
41 41 log = logging.getLogger(__name__)
42 42
43 43 REPORT_TYPE_MATRIX = {
44 44 'http_status': {"type": 'int',
45 45 "ops": ('eq', 'ne', 'ge', 'le',)},
46 46 'group:priority': {"type": 'int',
47 47 "ops": ('eq', 'ne', 'ge', 'le',)},
48 48 'duration': {"type": 'float',
49 49 "ops": ('ge', 'le',)},
50 50 'url_domain': {"type": 'unicode',
51 51 "ops": ('eq', 'ne', 'startswith', 'endswith', 'contains',)},
52 52 'url_path': {"type": 'unicode',
53 53 "ops": ('eq', 'ne', 'startswith', 'endswith', 'contains',)},
54 54 'error': {"type": 'unicode',
55 55 "ops": ('eq', 'ne', 'startswith', 'endswith', 'contains',)},
56 56 'tags:server_name': {"type": 'unicode',
57 57 "ops": ('eq', 'ne', 'startswith', 'endswith',
58 58 'contains',)},
59 59 'traceback': {"type": 'unicode',
60 60 "ops": ('contains',)},
61 61 'group:occurences': {"type": 'int',
62 62 "ops": ('eq', 'ne', 'ge', 'le',)}
63 63 }
64 64
65 65
66 66 class Report(Base, BaseModel):
67 67 __tablename__ = 'reports'
68 68 __table_args__ = {'implicit_returning': False}
69 69
70 70 id = sa.Column(sa.Integer, nullable=False, primary_key=True)
71 71 group_id = sa.Column(sa.BigInteger,
72 72 sa.ForeignKey('reports_groups.id', ondelete='cascade',
73 73 onupdate='cascade'))
74 74 resource_id = sa.Column(sa.Integer(), nullable=False, index=True)
75 75 report_type = sa.Column(sa.Integer(), nullable=False, index=True)
76 76 error = sa.Column(sa.UnicodeText(), index=True)
77 77 extra = sa.Column(JSON(), default={})
78 78 request = sa.Column(JSON(), nullable=False, default={})
79 79 ip = sa.Column(sa.String(39), index=True, default='')
80 80 username = sa.Column(sa.Unicode(255), default='')
81 81 user_agent = sa.Column(sa.Unicode(255), default='')
82 82 url = sa.Column(sa.UnicodeText(), index=True)
83 83 request_id = sa.Column(sa.Text())
84 84 request_stats = sa.Column(JSON(), nullable=False, default={})
85 85 traceback = sa.Column(JSON(), nullable=False, default=None)
86 86 traceback_hash = sa.Column(sa.Text())
87 87 start_time = sa.Column(sa.DateTime(), default=datetime.utcnow,
88 88 server_default=sa.func.now())
89 89 end_time = sa.Column(sa.DateTime())
90 90 duration = sa.Column(sa.Float, default=0)
91 91 http_status = sa.Column(sa.Integer, index=True)
92 92 url_domain = sa.Column(sa.Unicode(100), index=True)
93 93 url_path = sa.Column(sa.Unicode(255), index=True)
94 94 tags = sa.Column(JSON(), nullable=False, default={})
95 95 language = sa.Column(sa.Integer(), default=0)
96 96 # this is used to determine partition for the report
97 97 report_group_time = sa.Column(sa.DateTime(), default=datetime.utcnow,
98 98 server_default=sa.func.now())
99 99
100 100 logs = sa.orm.relationship(
101 101 'Log',
102 102 lazy='dynamic',
103 103 passive_deletes=True,
104 104 passive_updates=True,
105 105 primaryjoin="and_(Report.request_id==Log.request_id, "
106 106 "Log.request_id != None, Log.request_id != '')",
107 107 foreign_keys='[Log.request_id]')
108 108
109 109 slow_calls = sa.orm.relationship('SlowCall',
110 110 backref='detail',
111 111 cascade="all, delete-orphan",
112 112 passive_deletes=True,
113 113 passive_updates=True,
114 114 order_by='SlowCall.timestamp')
115 115
116 116 def set_data(self, data, resource, protocol_version=None):
117 117 self.http_status = data['http_status']
118 118 self.priority = data['priority']
119 119 self.error = data['error']
120 120 report_language = data.get('language', '').lower()
121 121 self.language = getattr(Language, report_language, Language.unknown)
122 122 # we need temp holder here to decide later
123 123 # if we want to to commit the tags if report is marked for creation
124 124 self.tags = {
125 125 'server_name': data['server'],
126 126 'view_name': data['view_name']
127 127 }
128 128 if data.get('tags'):
129 129 for tag_tuple in data['tags']:
130 130 self.tags[tag_tuple[0]] = tag_tuple[1]
131 131 self.traceback = data['traceback']
132 132 stripped_traceback = self.stripped_traceback()
133 133 tb_repr = repr(stripped_traceback).encode('utf8')
134 134 self.traceback_hash = hashlib.sha1(tb_repr).hexdigest()
135 135 url_info = urllib.parse.urlsplit(
136 136 data.get('url', ''), allow_fragments=False)
137 137 self.url_domain = url_info.netloc[:128]
138 138 self.url_path = url_info.path[:2048]
139 139 self.occurences = data['occurences']
140 140 if self.error:
141 141 self.report_type = ReportType.error
142 142 else:
143 143 self.report_type = ReportType.slow
144 144
145 145 # but if its status 404 its 404 type
146 146 if self.http_status in [404, '404'] or self.error == '404 Not Found':
147 147 self.report_type = ReportType.not_found
148 148 self.error = ''
149 149
150 150 self.generate_grouping_hash(data.get('appenlight.group_string',
151 151 data.get('group_string')),
152 152 resource.default_grouping,
153 153 protocol_version)
154 154
155 155 # details
156 156 if data['http_status'] in [404, '404']:
157 157 data = {"username": data["username"],
158 158 "ip": data["ip"],
159 159 "url": data["url"],
160 160 "user_agent": data["user_agent"]}
161 161 if data.get('HTTP_REFERER') or data.get('http_referer'):
162 162 data['HTTP_REFERER'] = data.get(
163 163 'HTTP_REFERER', '') or data.get('http_referer', '')
164 164
165 165 self.resource_id = resource.resource_id
166 166 self.username = data['username']
167 167 self.user_agent = data['user_agent']
168 168 self.ip = data['ip']
169 169 self.extra = {}
170 170 if data.get('extra'):
171 171 for extra_tuple in data['extra']:
172 172 self.extra[extra_tuple[0]] = extra_tuple[1]
173 173
174 174 self.url = data['url']
175 175 self.request_id = data.get('request_id', '').replace('-', '') or str(
176 176 uuid.uuid4())
177 177 request_data = data.get('request', {})
178 178
179 179 self.request = request_data
180 180 self.request_stats = data.get('request_stats', {})
181 181 traceback = data.get('traceback')
182 182 if not traceback:
183 183 traceback = data.get('frameinfo')
184 184 self.traceback = traceback
185 185 start_date = convert_date(data.get('start_time'))
186 186 if not self.start_time or self.start_time < start_date:
187 187 self.start_time = start_date
188 188
189 189 self.end_time = convert_date(data.get('end_time'), False)
190 190 self.duration = 0
191 191
192 192 if self.start_time and self.end_time:
193 193 d = self.end_time - self.start_time
194 194 self.duration = d.total_seconds()
195 195
196 196 # update tags with other vars
197 197 if self.username:
198 198 self.tags['user_name'] = self.username
199 199 self.tags['report_language'] = Language.key_from_value(self.language)
200 200
201 201 def add_slow_calls(self, data, report_group):
202 202 slow_calls = []
203 203 for call in data.get('slow_calls', []):
204 204 sc_inst = SlowCall()
205 205 sc_inst.set_data(call, resource_id=self.resource_id,
206 206 report_group=report_group)
207 207 slow_calls.append(sc_inst)
208 208 self.slow_calls.extend(slow_calls)
209 209 return slow_calls
210 210
211 211 def get_dict(self, request, details=False, exclude_keys=None,
212 212 include_keys=None):
213 213 from appenlight.models.services.report_group import ReportGroupService
214 214 instance_dict = super(Report, self).get_dict()
215 215 instance_dict['req_stats'] = self.req_stats()
216 216 instance_dict['group'] = {}
217 217 instance_dict['group']['id'] = self.report_group.id
218 218 instance_dict['group'][
219 219 'total_reports'] = self.report_group.total_reports
220 220 instance_dict['group']['last_report'] = self.report_group.last_report
221 221 instance_dict['group']['priority'] = self.report_group.priority
222 222 instance_dict['group']['occurences'] = self.report_group.occurences
223 223 instance_dict['group'][
224 224 'last_timestamp'] = self.report_group.last_timestamp
225 225 instance_dict['group'][
226 226 'first_timestamp'] = self.report_group.first_timestamp
227 227 instance_dict['group']['public'] = self.report_group.public
228 228 instance_dict['group']['fixed'] = self.report_group.fixed
229 229 instance_dict['group']['read'] = self.report_group.read
230 230 instance_dict['group'][
231 231 'average_duration'] = self.report_group.average_duration
232 232
233 233 instance_dict[
234 234 'resource_name'] = self.report_group.application.resource_name
235 235 instance_dict['report_type'] = self.report_type
236 236
237 237 if instance_dict['http_status'] == 404 and not instance_dict['error']:
238 238 instance_dict['error'] = '404 Not Found'
239 239
240 240 if details:
241 241 instance_dict['affected_users_count'] = \
242 242 ReportGroupService.affected_users_count(self.report_group)
243 243 instance_dict['top_affected_users'] = [
244 244 {'username': u.username, 'count': u.count} for u in
245 245 ReportGroupService.top_affected_users(self.report_group)]
246 246 instance_dict['application'] = {'integrations': []}
247 247 for integration in self.report_group.application.integrations:
248 248 if integration.front_visible:
249 249 instance_dict['application']['integrations'].append(
250 250 {'name': integration.integration_name,
251 251 'action': integration.integration_action})
252 252 instance_dict['comments'] = [c.get_dict() for c in
253 253 self.report_group.comments]
254 254
255 255 instance_dict['group']['next_report'] = None
256 256 instance_dict['group']['previous_report'] = None
257 next_in_group = self.get_next_in_group()
258 previous_in_group = self.get_previous_in_group()
257 next_in_group = self.get_next_in_group(request)
258 previous_in_group = self.get_previous_in_group(request)
259 259 if next_in_group:
260 instance_dict['group']['next_report'] = next_in_group.id
260 instance_dict['group']['next_report'] = next_in_group
261 261 if previous_in_group:
262 instance_dict['group']['previous_report'] = \
263 previous_in_group.id
262 instance_dict['group']['previous_report'] = previous_in_group
264 263
265 264 # slow call ordering
266 265 def find_parent(row, data):
267 266 for r in reversed(data):
268 267 try:
269 268 if (row['timestamp'] > r['timestamp'] and
270 269 row['end_time'] < r['end_time']):
271 270 return r
272 271 except TypeError as e:
273 272 log.warning('reports_view.find_parent: %s' % e)
274 273 return None
275 274
276 275 new_calls = []
277 276 calls = [c.get_dict() for c in self.slow_calls]
278 277 while calls:
279 278 # start from end
280 279 for x in range(len(calls) - 1, -1, -1):
281 280 parent = find_parent(calls[x], calls)
282 281 if parent:
283 282 parent['children'].append(calls[x])
284 283 else:
285 284 # no parent at all? append to new calls anyways
286 285 new_calls.append(calls[x])
287 286 # print 'append', calls[x]
288 287 del calls[x]
289 288 break
290 289 instance_dict['slow_calls'] = new_calls
291 290
292 291 instance_dict['front_url'] = self.get_public_url(request)
293 292
294 293 exclude_keys_list = exclude_keys or []
295 294 include_keys_list = include_keys or []
296 295 for k in list(instance_dict.keys()):
297 296 if k == 'group':
298 297 continue
299 298 if (k in exclude_keys_list or
300 299 (k not in include_keys_list and include_keys)):
301 300 del instance_dict[k]
302 301 return instance_dict
303 302
304 def get_previous_in_group(self):
305 start_day = self.report_group_time.date().replace(day=1)
306 end_day = start_day.replace(month=start_day.month+1)
307 query = self.report_group.reports.filter(Report.id < self.id)
308 query = query.filter(Report.report_group_time.between(
309 start_day, end_day))
310 return query.order_by(sa.desc(Report.id)).first()
311
312 def get_next_in_group(self):
313 start_day = self.report_group_time.date().replace(day=1)
314 end_day = start_day.replace(month=start_day.month+1)
315 query = self.report_group.reports.filter(Report.id > self.id)
316 query = query.filter(Report.report_group_time.between(
317 start_day, end_day))
318 return query.order_by(sa.asc(Report.id)).first()
303 def get_previous_in_group(self, request):
304 query = {
305 "size": 1,
306 "query": {
307 "filtered": {
308 "filter": {
309 "and": [{"term": {"group_id": self.group_id}},
310 {"range": {"pg_id": {"lt": self.id}}}]
311 }
312 }
313 },
314 "sort": [
315 {"_doc": {"order": "desc"}},
316 ],
317 }
318 result = request.es_conn.search(query, index=self.partition_id,
319 doc_type='report')
320 if result['hits']['total']:
321 return result['hits']['hits'][0]['_source']['pg_id']
322
323 def get_next_in_group(self, request):
324 query = {
325 "size": 1,
326 "query": {
327 "filtered": {
328 "filter": {
329 "and": [{"term": {"group_id": self.group_id}},
330 {"range": {"pg_id": {"gt": self.id}}}]
331 }
332 }
333 },
334 "sort": [
335 {"_doc": {"order": "asc"}},
336 ],
337 }
338 result = request.es_conn.search(query, index=self.partition_id,
339 doc_type='report')
340 if result['hits']['total']:
341 return result['hits']['hits'][0]['_source']['pg_id']
319 342
320 343 def get_public_url(self, request=None, report_group=None, _app_url=None):
321 344 """
322 345 Returns url that user can use to visit specific report
323 346 """
324 347 if not request:
325 348 request = get_current_request()
326 349 url = request.route_url('/', _app_url=_app_url)
327 350 if report_group:
328 351 return (url + 'ui/report/%s/%s') % (report_group.id, self.id)
329 352 return (url + 'ui/report/%s/%s') % (self.group_id, self.id)
330 353
331 354 def req_stats(self):
332 355 stats = self.request_stats.copy()
333 356 stats['percentages'] = {}
334 357 stats['percentages']['main'] = 100.0
335 358 main = stats.get('main', 0.0)
336 359 if not main:
337 360 return None
338 361 for name, call_time in stats.items():
339 362 if ('calls' not in name and 'main' not in name and
340 363 'percentages' not in name):
341 364 stats['main'] -= call_time
342 365 stats['percentages'][name] = math.floor(
343 366 (call_time / main * 100.0))
344 367 stats['percentages']['main'] -= stats['percentages'][name]
345 368 if stats['percentages']['main'] < 0.0:
346 369 stats['percentages']['main'] = 0.0
347 370 stats['main'] = 0.0
348 371 return stats
349 372
350 373 def generate_grouping_hash(self, hash_string=None, default_grouping=None,
351 374 protocol_version=None):
352 375 """
353 376 Generates SHA1 hash that will be used to group reports together
354 377 """
355 378 if not hash_string:
356 379 location = self.tags.get('view_name') or self.url_path;
357 380 server_name = self.tags.get('server_name') or ''
358 381 if default_grouping == 'url_traceback':
359 382 hash_string = '%s_%s_%s' % (self.traceback_hash, location,
360 383 self.error)
361 384 if self.language == Language.javascript:
362 385 hash_string = '%s_%s' % (self.traceback_hash, self.error)
363 386
364 387 elif default_grouping == 'traceback_server':
365 388 hash_string = '%s_%s' % (self.traceback_hash, server_name)
366 389 if self.language == Language.javascript:
367 390 hash_string = '%s_%s' % (self.traceback_hash, server_name)
368 391 else:
369 392 hash_string = '%s_%s' % (self.error, location)
370 393 binary_string = hash_string.encode('utf8')
371 394 self.grouping_hash = hashlib.sha1(binary_string).hexdigest()
372 395 return self.grouping_hash
373 396
374 397 def stripped_traceback(self):
375 398 """
376 399 Traceback without local vars
377 400 """
378 401 stripped_traceback = copy.deepcopy(self.traceback)
379 402
380 403 if isinstance(stripped_traceback, list):
381 404 for row in stripped_traceback:
382 405 row.pop('vars', None)
383 406 return stripped_traceback
384 407
385 408 def notify_channel(self, report_group):
386 409 """
387 410 Sends notification to websocket channel
388 411 """
389 412 settings = get_current_registry().settings
390 413 log.info('notify cometd')
391 414 if self.report_type != ReportType.error:
392 415 return
393 416 payload = {
394 417 'type': 'message',
395 418 "user": '__system__',
396 419 "channel": 'app_%s' % self.resource_id,
397 420 'message': {
398 421 'type': 'report',
399 422 'report': {
400 423 'group': {
401 424 'priority': report_group.priority,
402 425 'first_timestamp': report_group.first_timestamp,
403 426 'last_timestamp': report_group.last_timestamp,
404 427 'average_duration': report_group.average_duration,
405 428 'occurences': report_group.occurences
406 429 },
407 430 'report_id': self.id,
408 431 'group_id': self.group_id,
409 432 'resource_id': self.resource_id,
410 433 'http_status': self.http_status,
411 434 'url_domain': self.url_domain,
412 435 'url_path': self.url_path,
413 436 'error': self.error or '',
414 437 'server': self.tags.get('server_name'),
415 438 'view_name': self.tags.get('view_name'),
416 439 'front_url': self.get_public_url(),
417 440 }
418 441 }
419 442
420 443 }
421 444
422 445 cometd_request(settings['cometd.secret'], '/message', [payload],
423 446 servers=[settings['cometd_servers']])
424 447
425 448 def es_doc(self):
426 449 tags = {}
427 450 tag_list = []
428 451 for name, value in self.tags.items():
429 452 name = name.replace('.', '_')
430 453 tag_list.append(name)
431 454 tags[name] = {
432 455 "values": convert_es_type(value),
433 456 "numeric_values": value if (
434 457 isinstance(value, (int, float)) and
435 458 not isinstance(value, bool)) else None}
436 459
437 460 if 'user_name' not in self.tags and self.username:
438 461 tags["user_name"] = {"value": [self.username],
439 462 "numeric_value": None}
440 463 return {
441 464 '_id': str(self.id),
442 465 'pg_id': str(self.id),
443 466 'resource_id': self.resource_id,
444 467 'http_status': self.http_status or '',
445 468 'start_time': self.start_time,
446 469 'end_time': self.end_time,
447 470 'url_domain': self.url_domain if self.url_domain else '',
448 471 'url_path': self.url_path if self.url_path else '',
449 472 'duration': self.duration,
450 473 'error': self.error if self.error else '',
451 474 'report_type': self.report_type,
452 475 'request_id': self.request_id,
453 476 'ip': self.ip,
454 477 'group_id': str(self.group_id),
455 478 '_parent': str(self.group_id),
456 479 'tags': tags,
457 480 'tag_list': tag_list
458 481 }
459 482
460 483 @property
461 484 def partition_id(self):
462 485 return 'rcae_r_%s' % self.report_group_time.strftime('%Y_%m')
463 486
464 487
465 488 def after_insert(mapper, connection, target):
466 489 if not hasattr(target, '_skip_ft_index'):
467 490 data = target.es_doc()
468 491 data.pop('_id', None)
469 492 Datastores.es.index(target.partition_id, 'report', data,
470 493 parent=target.group_id, id=target.id)
471 494
472 495
473 496 def after_update(mapper, connection, target):
474 497 if not hasattr(target, '_skip_ft_index'):
475 498 data = target.es_doc()
476 499 data.pop('_id', None)
477 500 Datastores.es.index(target.partition_id, 'report', data,
478 501 parent=target.group_id, id=target.id)
479 502
480 503
481 504 def after_delete(mapper, connection, target):
482 505 if not hasattr(target, '_skip_ft_index'):
483 506 query = {'term': {'pg_id': target.id}}
484 507 Datastores.es.delete_by_query(target.partition_id, 'report', query)
485 508
486 509
487 510 sa.event.listen(Report, 'after_insert', after_insert)
488 511 sa.event.listen(Report, 'after_update', after_update)
489 512 sa.event.listen(Report, 'after_delete', after_delete)
General Comments 0
You need to be logged in to leave comments. Login now