##// END OF EJS Templates
channelstream: separate element and basic pubsub
ergo -
Show More

The requested changes are too big and content was truncated. Show full diff

1 NO CONTENT: new file 100644
The requested commit or file is too big and content was truncated. Show full diff
@@ -1,495 +1,495 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2016 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # AppEnlight Enterprise Edition, including its added features, Support
19 19 # services, and proprietary license terms, please see
20 20 # https://rhodecode.com/licenses/
21 21
22 22 """
23 23 Utility functions.
24 24 """
25 25 import logging
26 26 import requests
27 27 import hashlib
28 28 import json
29 29 import copy
30 30 import uuid
31 31 import appenlight.lib.helpers as h
32 32 from collections import namedtuple
33 33 from datetime import timedelta, datetime, date
34 34 from dogpile.cache.api import NO_VALUE
35 35 from appenlight.models import Datastores
36 36 from appenlight.validators import (LogSearchSchema,
37 37 TagListSchema,
38 38 accepted_search_params)
39 39 from itsdangerous import TimestampSigner
40 40 from ziggurat_foundations.permissions import ALL_PERMISSIONS
41 41 from dateutil.relativedelta import relativedelta
42 42 from dateutil.rrule import rrule, MONTHLY, DAILY
43 43
44 44 log = logging.getLogger(__name__)
45 45
46 46
47 47 Stat = namedtuple('Stat', 'start_interval value')
48 48
49 49
50 50 def default_extractor(item):
51 51 """
52 52 :param item - item to extract date from
53 53 """
54 54 if hasattr(item, 'start_interval'):
55 55 return item.start_interval
56 56 return item['start_interval']
57 57
58 58
59 59 # fast gap generator
60 60 def gap_gen_default(start, step, itemiterator, end_time=None,
61 61 iv_extractor=None):
62 62 """ generates a list of time/value items based on step and itemiterator
63 63 if there are entries missing from iterator time/None will be returned
64 64 instead
65 65 :param start - datetime - what time should we start generating our values
66 66 :param step - timedelta - stepsize
67 67 :param itemiterator - iterable - we will check this iterable for values
68 68 corresponding to generated steps
69 69 :param end_time - datetime - when last step is >= end_time stop iterating
70 70 :param iv_extractor - extracts current step from iterable items
71 71 """
72 72
73 73 if not iv_extractor:
74 74 iv_extractor = default_extractor
75 75
76 76 next_step = start
77 77 minutes = step.total_seconds() / 60.0
78 78 while next_step.minute % minutes != 0:
79 79 next_step = next_step.replace(minute=next_step.minute - 1)
80 80 for item in itemiterator:
81 81 item_start_interval = iv_extractor(item)
82 82 # do we have a match for current time step in our data?
83 83 # no gen a new tuple with 0 values
84 84 while next_step < item_start_interval:
85 85 yield Stat(next_step, None)
86 86 next_step = next_step + step
87 87 if next_step == item_start_interval:
88 88 yield Stat(item_start_interval, item)
89 89 next_step = next_step + step
90 90 if end_time:
91 91 while next_step < end_time:
92 92 yield Stat(next_step, None)
93 93 next_step = next_step + step
94 94
95 95
96 96 class DateTimeEncoder(json.JSONEncoder):
97 97 """ Simple datetime to ISO encoder for json serialization"""
98 98
99 99 def default(self, obj):
100 100 if isinstance(obj, date):
101 101 return obj.isoformat()
102 102 if isinstance(obj, datetime):
103 103 return obj.isoformat()
104 104 return json.JSONEncoder.default(self, obj)
105 105
106 106
107 def cometd_request(secret, endpoint, payload, throw_exceptions=False,
107 def channelstream_request(secret, endpoint, payload, throw_exceptions=False,
108 108 servers=None):
109 109 responses = []
110 110 if not servers:
111 111 servers = []
112 112
113 113 signer = TimestampSigner(secret)
114 114 sig_for_server = signer.sign(endpoint)
115 115 for secret, server in [(s['secret'], s['server']) for s in servers]:
116 116 response = {}
117 117 secret_headers = {'x-channelstream-secret': sig_for_server,
118 118 'x-channelstream-endpoint': endpoint,
119 119 'Content-Type': 'application/json'}
120 120 url = '%s%s' % (server, endpoint)
121 121 try:
122 122 response = requests.post(url,
123 123 data=json.dumps(payload,
124 124 cls=DateTimeEncoder),
125 125 headers=secret_headers,
126 126 verify=False,
127 127 timeout=2).json()
128 128 except requests.exceptions.RequestException as e:
129 129 if throw_exceptions:
130 130 raise
131 131 responses.append(response)
132 132 return responses
133 133
134 134
135 135 def add_cors_headers(response):
136 136 # allow CORS
137 137 response.headers.add('Access-Control-Allow-Origin', '*')
138 138 response.headers.add('XDomainRequestAllowed', '1')
139 139 response.headers.add('Access-Control-Allow-Methods', 'GET, POST, OPTIONS')
140 140 # response.headers.add('Access-Control-Allow-Credentials', 'true')
141 141 response.headers.add('Access-Control-Allow-Headers',
142 142 'Content-Type, Depth, User-Agent, X-File-Size, X-Requested-With, If-Modified-Since, X-File-Name, Cache-Control, Pragma, Origin, Connection, Referer, Cookie')
143 143 response.headers.add('Access-Control-Max-Age', '86400')
144 144
145 145
146 146 from sqlalchemy.sql import compiler
147 147 from psycopg2.extensions import adapt as sqlescape
148 148
149 149
150 150 # or use the appropiate escape function from your db driver
151 151
152 152 def compile_query(query):
153 153 dialect = query.session.bind.dialect
154 154 statement = query.statement
155 155 comp = compiler.SQLCompiler(dialect, statement)
156 156 comp.compile()
157 157 enc = dialect.encoding
158 158 params = {}
159 159 for k, v in comp.params.items():
160 160 if isinstance(v, str):
161 161 v = v.encode(enc)
162 162 params[k] = sqlescape(v)
163 163 return (comp.string.encode(enc) % params).decode(enc)
164 164
165 165
166 166 def convert_es_type(input_data):
167 167 """
168 168 This might need to convert some text or other types to corresponding ES types
169 169 """
170 170 return str(input_data)
171 171
172 172
173 173 ProtoVersion = namedtuple('ProtoVersion', ['major', 'minor', 'patch'])
174 174
175 175
176 176 def parse_proto(input_data):
177 177 try:
178 178 parts = [int(x) for x in input_data.split('.')]
179 179 while len(parts) < 3:
180 180 parts.append(0)
181 181 return ProtoVersion(*parts)
182 182 except Exception as e:
183 183 log.info('Unknown protocol version: %s' % e)
184 184 return ProtoVersion(99, 99, 99)
185 185
186 186
187 187 def es_index_name_limiter(start_date=None, end_date=None, months_in_past=6,
188 188 ixtypes=None):
189 189 """
190 190 This function limits the search to 6 months by default so we don't have to
191 191 query 300 elasticsearch indices for 20 years of historical data for example
192 192 """
193 193
194 194 # should be cached later
195 195 def get_possible_names():
196 196 return list(Datastores.es.aliases().keys())
197 197
198 198 possible_names = get_possible_names()
199 199 es_index_types = []
200 200 if not ixtypes:
201 201 ixtypes = ['reports', 'metrics', 'logs']
202 202 for t in ixtypes:
203 203 if t == 'reports':
204 204 es_index_types.append('rcae_r_%s')
205 205 elif t == 'logs':
206 206 es_index_types.append('rcae_l_%s')
207 207 elif t == 'metrics':
208 208 es_index_types.append('rcae_m_%s')
209 209 elif t == 'uptime':
210 210 es_index_types.append('rcae_u_%s')
211 211 elif t == 'slow_calls':
212 212 es_index_types.append('rcae_sc_%s')
213 213
214 214 if start_date:
215 215 start_date = copy.copy(start_date)
216 216 else:
217 217 if not end_date:
218 218 end_date = datetime.utcnow()
219 219 start_date = end_date + relativedelta(months=months_in_past * -1)
220 220
221 221 if not end_date:
222 222 end_date = start_date + relativedelta(months=months_in_past)
223 223
224 224 index_dates = list(rrule(MONTHLY,
225 225 dtstart=start_date.date().replace(day=1),
226 226 until=end_date.date(),
227 227 count=36))
228 228 index_names = []
229 229 for ix_type in es_index_types:
230 230 to_extend = [ix_type % d.strftime('%Y_%m') for d in index_dates
231 231 if ix_type % d.strftime('%Y_%m') in possible_names]
232 232 index_names.extend(to_extend)
233 233 for day in list(rrule(DAILY, dtstart=start_date.date(),
234 234 until=end_date.date(), count=366)):
235 235 ix_name = ix_type % day.strftime('%Y_%m_%d')
236 236 if ix_name in possible_names:
237 237 index_names.append(ix_name)
238 238 return index_names
239 239
240 240
241 241 def build_filter_settings_from_query_dict(
242 242 request, params=None, override_app_ids=None,
243 243 resource_permissions=None):
244 244 """
245 245 Builds list of normalized search terms for ES from query params
246 246 ensuring application list is restricted to only applications user
247 247 has access to
248 248
249 249 :param params (dictionary)
250 250 :param override_app_ids - list of application id's to use instead of
251 251 applications user normally has access to
252 252 """
253 253 params = copy.deepcopy(params)
254 254 applications = []
255 255 if not resource_permissions:
256 256 resource_permissions = ['view']
257 257
258 258 if request.user:
259 259 applications = request.user.resources_with_perms(
260 260 resource_permissions, resource_types=['application'])
261 261
262 262 # CRITICAL - this ensures our resultset is limited to only the ones
263 263 # user has view permissions
264 264 all_possible_app_ids = set([app.resource_id for app in applications])
265 265
266 266 # if override is preset we force permission for app to be present
267 267 # this allows users to see dashboards and applications they would
268 268 # normally not be able to
269 269
270 270 if override_app_ids:
271 271 all_possible_app_ids = set(override_app_ids)
272 272
273 273 schema = LogSearchSchema().bind(resources=all_possible_app_ids)
274 274 tag_schema = TagListSchema()
275 275 filter_settings = schema.deserialize(params)
276 276 tag_list = []
277 277 for k, v in list(filter_settings.items()):
278 278 if k in accepted_search_params:
279 279 continue
280 280 tag_list.append({"name": k, "value": v, "op": 'eq'})
281 281 # remove the key from filter_settings
282 282 filter_settings.pop(k, None)
283 283 tags = tag_schema.deserialize(tag_list)
284 284 filter_settings['tags'] = tags
285 285 return filter_settings
286 286
287 287
288 288 def gen_uuid():
289 289 return str(uuid.uuid4())
290 290
291 291
292 292 def gen_uuid4_sha_hex():
293 293 return hashlib.sha1(uuid.uuid4().bytes).hexdigest()
294 294
295 295
296 296 def permission_tuple_to_dict(data):
297 297 out = {
298 298 "user_name": None,
299 299 "perm_name": data.perm_name,
300 300 "owner": data.owner,
301 301 "type": data.type,
302 302 "resource_name": None,
303 303 "resource_type": None,
304 304 "resource_id": None,
305 305 "group_name": None,
306 306 "group_id": None
307 307 }
308 308 if data.user:
309 309 out["user_name"] = data.user.user_name
310 310 if data.perm_name == ALL_PERMISSIONS:
311 311 out['perm_name'] = '__all_permissions__'
312 312 if data.resource:
313 313 out['resource_name'] = data.resource.resource_name
314 314 out['resource_type'] = data.resource.resource_type
315 315 out['resource_id'] = data.resource.resource_id
316 316 if data.group:
317 317 out['group_name'] = data.group.group_name
318 318 out['group_id'] = data.group.id
319 319 return out
320 320
321 321
322 322 def get_cached_buckets(request, stats_since, end_time, fn, cache_key,
323 323 gap_gen=None, db_session=None, step_interval=None,
324 324 iv_extractor=None,
325 325 rerange=False, *args, **kwargs):
326 326 """ Takes "fn" that should return some data and tries to load the data
327 327 dividing it into daily buckets - if the stats_since and end time give a
328 328 delta bigger than 24hours, then only "todays" data is computed on the fly
329 329
330 330 :param request: (request) request object
331 331 :param stats_since: (datetime) start date of buckets range
332 332 :param end_time: (datetime) end date of buckets range - utcnow() if None
333 333 :param fn: (callable) callable to use to populate buckets should have
334 334 following signature:
335 335 def get_data(request, since_when, until, *args, **kwargs):
336 336
337 337 :param cache_key: (string) cache key that will be used to build bucket
338 338 caches
339 339 :param gap_gen: (callable) gap generator - should return step intervals
340 340 to use with out `fn` callable
341 341 :param db_session: (Session) sqlalchemy session
342 342 :param step_interval: (timedelta) optional step interval if we want to
343 343 override the default determined from total start/end time delta
344 344 :param iv_extractor: (callable) used to get step intervals from data
345 345 returned by `fn` callable
346 346 :param rerange: (bool) handy if we want to change ranges from hours to
347 347 days when cached data is missing - will shorten execution time if `fn`
348 348 callable supports that and we are working with multiple rows - like metrics
349 349 :param args:
350 350 :param kwargs:
351 351
352 352 :return: iterable
353 353 """
354 354 if not end_time:
355 355 end_time = datetime.utcnow().replace(second=0, microsecond=0)
356 356 delta = end_time - stats_since
357 357 # if smaller than 3 days we want to group by 5min else by 1h,
358 358 # for 60 min group by min
359 359 if not gap_gen:
360 360 gap_gen = gap_gen_default
361 361 if not iv_extractor:
362 362 iv_extractor = default_extractor
363 363
364 364 # do not use custom interval if total time range with new iv would exceed
365 365 # end time
366 366 if not step_interval or stats_since + step_interval >= end_time:
367 367 if delta < h.time_deltas.get('12h')['delta']:
368 368 step_interval = timedelta(seconds=60)
369 369 elif delta < h.time_deltas.get('3d')['delta']:
370 370 step_interval = timedelta(seconds=60 * 5)
371 371 elif delta > h.time_deltas.get('2w')['delta']:
372 372 step_interval = timedelta(days=1)
373 373 else:
374 374 step_interval = timedelta(minutes=60)
375 375
376 376 if step_interval >= timedelta(minutes=60):
377 377 log.info('cached_buckets:{}: adjusting start time '
378 378 'for hourly or daily intervals'.format(cache_key))
379 379 stats_since = stats_since.replace(hour=0, minute=0)
380 380
381 381 ranges = [i.start_interval for i in list(gap_gen(stats_since,
382 382 step_interval, [],
383 383 end_time=end_time))]
384 384 buckets = {}
385 385 storage_key = 'buckets:' + cache_key + '{}|{}'
386 386 # this means we basicly cache per hour in 3-14 day intervals but i think
387 387 # its fine at this point - will be faster than db access anyways
388 388
389 389 if len(ranges) >= 1:
390 390 last_ranges = [ranges[-1]]
391 391 else:
392 392 last_ranges = []
393 393 if step_interval >= timedelta(minutes=60):
394 394 for r in ranges:
395 395 k = storage_key.format(step_interval.total_seconds(), r)
396 396 value = request.registry.cache_regions.redis_day_30.get(k)
397 397 # last buckets are never loaded from cache
398 398 is_last_result = (
399 399 r >= end_time - timedelta(hours=6) or r in last_ranges)
400 400 if value is not NO_VALUE and not is_last_result:
401 401 log.info("cached_buckets:{}: "
402 402 "loading range {} from cache".format(cache_key, r))
403 403 buckets[r] = value
404 404 else:
405 405 log.info("cached_buckets:{}: "
406 406 "loading range {} from storage".format(cache_key, r))
407 407 range_size = step_interval
408 408 if (step_interval == timedelta(minutes=60) and
409 409 not is_last_result and rerange):
410 410 range_size = timedelta(days=1)
411 411 r = r.replace(hour=0, minute=0)
412 412 log.info("cached_buckets:{}: "
413 413 "loading collapsed "
414 414 "range {} {}".format(cache_key, r,
415 415 r + range_size))
416 416 bucket_data = fn(
417 417 request, r, r + range_size, step_interval,
418 418 gap_gen, bucket_count=len(ranges), *args, **kwargs)
419 419 for b in bucket_data:
420 420 b_iv = iv_extractor(b)
421 421 buckets[b_iv] = b
422 422 k2 = storage_key.format(
423 423 step_interval.total_seconds(), b_iv)
424 424 request.registry.cache_regions.redis_day_30.set(k2, b)
425 425 log.info("cached_buckets:{}: saving cache".format(cache_key))
426 426 else:
427 427 # bucket count is 1 for short time ranges <= 24h from now
428 428 bucket_data = fn(request, stats_since, end_time, step_interval,
429 429 gap_gen, bucket_count=1, *args, **kwargs)
430 430 for b in bucket_data:
431 431 buckets[iv_extractor(b)] = b
432 432 return buckets
433 433
434 434
435 435 def get_cached_split_data(request, stats_since, end_time, fn, cache_key,
436 436 db_session=None, *args, **kwargs):
437 437 """ Takes "fn" that should return some data and tries to load the data
438 438 dividing it into 2 buckets - cached "since_from" bucket and "today"
439 439 bucket - then the data can be reduced into single value
440 440
441 441 Data is cached if the stats_since and end time give a delta bigger
442 442 than 24hours - then only 24h is computed on the fly
443 443 """
444 444 if not end_time:
445 445 end_time = datetime.utcnow().replace(second=0, microsecond=0)
446 446 delta = end_time - stats_since
447 447
448 448 if delta >= timedelta(minutes=60):
449 449 log.info('cached_split_data:{}: adjusting start time '
450 450 'for hourly or daily intervals'.format(cache_key))
451 451 stats_since = stats_since.replace(hour=0, minute=0)
452 452
453 453 storage_key = 'buckets_split_data:' + cache_key + ':{}|{}'
454 454 old_end_time = end_time.replace(hour=0, minute=0)
455 455
456 456 final_storage_key = storage_key.format(delta.total_seconds(),
457 457 old_end_time)
458 458 older_data = None
459 459
460 460 cdata = request.registry.cache_regions.redis_day_7.get(
461 461 final_storage_key)
462 462
463 463 if cdata:
464 464 log.info("cached_split_data:{}: found old "
465 465 "bucket data".format(cache_key))
466 466 older_data = cdata
467 467
468 468 if (stats_since < end_time - h.time_deltas.get('24h')['delta'] and
469 469 not cdata):
470 470 log.info("cached_split_data:{}: didn't find the "
471 471 "start bucket in cache so load older data".format(cache_key))
472 472 recent_stats_since = old_end_time
473 473 older_data = fn(request, stats_since, recent_stats_since,
474 474 db_session=db_session, *args, **kwargs)
475 475 request.registry.cache_regions.redis_day_7.set(final_storage_key,
476 476 older_data)
477 477 elif stats_since < end_time - h.time_deltas.get('24h')['delta']:
478 478 recent_stats_since = old_end_time
479 479 else:
480 480 recent_stats_since = stats_since
481 481
482 482 log.info("cached_split_data:{}: loading fresh "
483 483 "data bucksts from last 24h ".format(cache_key))
484 484 todays_data = fn(request, recent_stats_since, end_time,
485 485 db_session=db_session, *args, **kwargs)
486 486 return older_data, todays_data
487 487
488 488
489 489 def in_batches(seq, size):
490 490 """
491 491 Splits am iterable into batches of specified size
492 492 :param seq (iterable)
493 493 :param size integer
494 494 """
495 495 return (seq[pos:pos + size] for pos in range(0, len(seq), size))
@@ -1,512 +1,511 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2016 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # AppEnlight Enterprise Edition, including its added features, Support
19 19 # services, and proprietary license terms, please see
20 20 # https://rhodecode.com/licenses/
21 21
22 22 from datetime import datetime
23 23 import math
24 24 import uuid
25 25 import hashlib
26 26 import copy
27 27 import urllib.parse
28 28 import logging
29 29 import sqlalchemy as sa
30 30
31 31 from appenlight.models import Base, Datastores
32 32 from appenlight.lib.utils.date_utils import convert_date
33 33 from appenlight.lib.utils import convert_es_type
34 34 from appenlight.models.slow_call import SlowCall
35 from appenlight.lib.utils import cometd_request
35 from appenlight.lib.utils import channelstream_request
36 36 from appenlight.lib.enums import ReportType, Language
37 37 from pyramid.threadlocal import get_current_registry, get_current_request
38 38 from sqlalchemy.dialects.postgresql import JSON
39 39 from ziggurat_foundations.models.base import BaseModel
40 40
41 41 log = logging.getLogger(__name__)
42 42
43 43 REPORT_TYPE_MATRIX = {
44 44 'http_status': {"type": 'int',
45 45 "ops": ('eq', 'ne', 'ge', 'le',)},
46 46 'group:priority': {"type": 'int',
47 47 "ops": ('eq', 'ne', 'ge', 'le',)},
48 48 'duration': {"type": 'float',
49 49 "ops": ('ge', 'le',)},
50 50 'url_domain': {"type": 'unicode',
51 51 "ops": ('eq', 'ne', 'startswith', 'endswith', 'contains',)},
52 52 'url_path': {"type": 'unicode',
53 53 "ops": ('eq', 'ne', 'startswith', 'endswith', 'contains',)},
54 54 'error': {"type": 'unicode',
55 55 "ops": ('eq', 'ne', 'startswith', 'endswith', 'contains',)},
56 56 'tags:server_name': {"type": 'unicode',
57 57 "ops": ('eq', 'ne', 'startswith', 'endswith',
58 58 'contains',)},
59 59 'traceback': {"type": 'unicode',
60 60 "ops": ('contains',)},
61 61 'group:occurences': {"type": 'int',
62 62 "ops": ('eq', 'ne', 'ge', 'le',)}
63 63 }
64 64
65 65
66 66 class Report(Base, BaseModel):
67 67 __tablename__ = 'reports'
68 68 __table_args__ = {'implicit_returning': False}
69 69
70 70 id = sa.Column(sa.Integer, nullable=False, primary_key=True)
71 71 group_id = sa.Column(sa.BigInteger,
72 72 sa.ForeignKey('reports_groups.id', ondelete='cascade',
73 73 onupdate='cascade'))
74 74 resource_id = sa.Column(sa.Integer(), nullable=False, index=True)
75 75 report_type = sa.Column(sa.Integer(), nullable=False, index=True)
76 76 error = sa.Column(sa.UnicodeText(), index=True)
77 77 extra = sa.Column(JSON(), default={})
78 78 request = sa.Column(JSON(), nullable=False, default={})
79 79 ip = sa.Column(sa.String(39), index=True, default='')
80 80 username = sa.Column(sa.Unicode(255), default='')
81 81 user_agent = sa.Column(sa.Unicode(255), default='')
82 82 url = sa.Column(sa.UnicodeText(), index=True)
83 83 request_id = sa.Column(sa.Text())
84 84 request_stats = sa.Column(JSON(), nullable=False, default={})
85 85 traceback = sa.Column(JSON(), nullable=False, default=None)
86 86 traceback_hash = sa.Column(sa.Text())
87 87 start_time = sa.Column(sa.DateTime(), default=datetime.utcnow,
88 88 server_default=sa.func.now())
89 89 end_time = sa.Column(sa.DateTime())
90 90 duration = sa.Column(sa.Float, default=0)
91 91 http_status = sa.Column(sa.Integer, index=True)
92 92 url_domain = sa.Column(sa.Unicode(100), index=True)
93 93 url_path = sa.Column(sa.Unicode(255), index=True)
94 94 tags = sa.Column(JSON(), nullable=False, default={})
95 95 language = sa.Column(sa.Integer(), default=0)
96 96 # this is used to determine partition for the report
97 97 report_group_time = sa.Column(sa.DateTime(), default=datetime.utcnow,
98 98 server_default=sa.func.now())
99 99
100 100 logs = sa.orm.relationship(
101 101 'Log',
102 102 lazy='dynamic',
103 103 passive_deletes=True,
104 104 passive_updates=True,
105 105 primaryjoin="and_(Report.request_id==Log.request_id, "
106 106 "Log.request_id != None, Log.request_id != '')",
107 107 foreign_keys='[Log.request_id]')
108 108
109 109 slow_calls = sa.orm.relationship('SlowCall',
110 110 backref='detail',
111 111 cascade="all, delete-orphan",
112 112 passive_deletes=True,
113 113 passive_updates=True,
114 114 order_by='SlowCall.timestamp')
115 115
116 116 def set_data(self, data, resource, protocol_version=None):
117 117 self.http_status = data['http_status']
118 118 self.priority = data['priority']
119 119 self.error = data['error']
120 120 report_language = data.get('language', '').lower()
121 121 self.language = getattr(Language, report_language, Language.unknown)
122 122 # we need temp holder here to decide later
123 123 # if we want to to commit the tags if report is marked for creation
124 124 self.tags = {
125 125 'server_name': data['server'],
126 126 'view_name': data['view_name']
127 127 }
128 128 if data.get('tags'):
129 129 for tag_tuple in data['tags']:
130 130 self.tags[tag_tuple[0]] = tag_tuple[1]
131 131 self.traceback = data['traceback']
132 132 stripped_traceback = self.stripped_traceback()
133 133 tb_repr = repr(stripped_traceback).encode('utf8')
134 134 self.traceback_hash = hashlib.sha1(tb_repr).hexdigest()
135 135 url_info = urllib.parse.urlsplit(
136 136 data.get('url', ''), allow_fragments=False)
137 137 self.url_domain = url_info.netloc[:128]
138 138 self.url_path = url_info.path[:2048]
139 139 self.occurences = data['occurences']
140 140 if self.error:
141 141 self.report_type = ReportType.error
142 142 else:
143 143 self.report_type = ReportType.slow
144 144
145 145 # but if its status 404 its 404 type
146 146 if self.http_status in [404, '404'] or self.error == '404 Not Found':
147 147 self.report_type = ReportType.not_found
148 148 self.error = ''
149 149
150 150 self.generate_grouping_hash(data.get('appenlight.group_string',
151 151 data.get('group_string')),
152 152 resource.default_grouping,
153 153 protocol_version)
154 154
155 155 # details
156 156 if data['http_status'] in [404, '404']:
157 157 data = {"username": data["username"],
158 158 "ip": data["ip"],
159 159 "url": data["url"],
160 160 "user_agent": data["user_agent"]}
161 161 if data.get('HTTP_REFERER') or data.get('http_referer'):
162 162 data['HTTP_REFERER'] = data.get(
163 163 'HTTP_REFERER', '') or data.get('http_referer', '')
164 164
165 165 self.resource_id = resource.resource_id
166 166 self.username = data['username']
167 167 self.user_agent = data['user_agent']
168 168 self.ip = data['ip']
169 169 self.extra = {}
170 170 if data.get('extra'):
171 171 for extra_tuple in data['extra']:
172 172 self.extra[extra_tuple[0]] = extra_tuple[1]
173 173
174 174 self.url = data['url']
175 175 self.request_id = data.get('request_id', '').replace('-', '') or str(
176 176 uuid.uuid4())
177 177 request_data = data.get('request', {})
178 178
179 179 self.request = request_data
180 180 self.request_stats = data.get('request_stats', {})
181 181 traceback = data.get('traceback')
182 182 if not traceback:
183 183 traceback = data.get('frameinfo')
184 184 self.traceback = traceback
185 185 start_date = convert_date(data.get('start_time'))
186 186 if not self.start_time or self.start_time < start_date:
187 187 self.start_time = start_date
188 188
189 189 self.end_time = convert_date(data.get('end_time'), False)
190 190 self.duration = 0
191 191
192 192 if self.start_time and self.end_time:
193 193 d = self.end_time - self.start_time
194 194 self.duration = d.total_seconds()
195 195
196 196 # update tags with other vars
197 197 if self.username:
198 198 self.tags['user_name'] = self.username
199 199 self.tags['report_language'] = Language.key_from_value(self.language)
200 200
201 201 def add_slow_calls(self, data, report_group):
202 202 slow_calls = []
203 203 for call in data.get('slow_calls', []):
204 204 sc_inst = SlowCall()
205 205 sc_inst.set_data(call, resource_id=self.resource_id,
206 206 report_group=report_group)
207 207 slow_calls.append(sc_inst)
208 208 self.slow_calls.extend(slow_calls)
209 209 return slow_calls
210 210
211 211 def get_dict(self, request, details=False, exclude_keys=None,
212 212 include_keys=None):
213 213 from appenlight.models.services.report_group import ReportGroupService
214 214 instance_dict = super(Report, self).get_dict()
215 215 instance_dict['req_stats'] = self.req_stats()
216 216 instance_dict['group'] = {}
217 217 instance_dict['group']['id'] = self.report_group.id
218 218 instance_dict['group'][
219 219 'total_reports'] = self.report_group.total_reports
220 220 instance_dict['group']['last_report'] = self.report_group.last_report
221 221 instance_dict['group']['priority'] = self.report_group.priority
222 222 instance_dict['group']['occurences'] = self.report_group.occurences
223 223 instance_dict['group'][
224 224 'last_timestamp'] = self.report_group.last_timestamp
225 225 instance_dict['group'][
226 226 'first_timestamp'] = self.report_group.first_timestamp
227 227 instance_dict['group']['public'] = self.report_group.public
228 228 instance_dict['group']['fixed'] = self.report_group.fixed
229 229 instance_dict['group']['read'] = self.report_group.read
230 230 instance_dict['group'][
231 231 'average_duration'] = self.report_group.average_duration
232 232
233 233 instance_dict[
234 234 'resource_name'] = self.report_group.application.resource_name
235 235 instance_dict['report_type'] = self.report_type
236 236
237 237 if instance_dict['http_status'] == 404 and not instance_dict['error']:
238 238 instance_dict['error'] = '404 Not Found'
239 239
240 240 if details:
241 241 instance_dict['affected_users_count'] = \
242 242 ReportGroupService.affected_users_count(self.report_group)
243 243 instance_dict['top_affected_users'] = [
244 244 {'username': u.username, 'count': u.count} for u in
245 245 ReportGroupService.top_affected_users(self.report_group)]
246 246 instance_dict['application'] = {'integrations': []}
247 247 for integration in self.report_group.application.integrations:
248 248 if integration.front_visible:
249 249 instance_dict['application']['integrations'].append(
250 250 {'name': integration.integration_name,
251 251 'action': integration.integration_action})
252 252 instance_dict['comments'] = [c.get_dict() for c in
253 253 self.report_group.comments]
254 254
255 255 instance_dict['group']['next_report'] = None
256 256 instance_dict['group']['previous_report'] = None
257 257 next_in_group = self.get_next_in_group(request)
258 258 previous_in_group = self.get_previous_in_group(request)
259 259 if next_in_group:
260 260 instance_dict['group']['next_report'] = next_in_group
261 261 if previous_in_group:
262 262 instance_dict['group']['previous_report'] = previous_in_group
263 263
264 264 # slow call ordering
265 265 def find_parent(row, data):
266 266 for r in reversed(data):
267 267 try:
268 268 if (row['timestamp'] > r['timestamp'] and
269 269 row['end_time'] < r['end_time']):
270 270 return r
271 271 except TypeError as e:
272 272 log.warning('reports_view.find_parent: %s' % e)
273 273 return None
274 274
275 275 new_calls = []
276 276 calls = [c.get_dict() for c in self.slow_calls]
277 277 while calls:
278 278 # start from end
279 279 for x in range(len(calls) - 1, -1, -1):
280 280 parent = find_parent(calls[x], calls)
281 281 if parent:
282 282 parent['children'].append(calls[x])
283 283 else:
284 284 # no parent at all? append to new calls anyways
285 285 new_calls.append(calls[x])
286 286 # print 'append', calls[x]
287 287 del calls[x]
288 288 break
289 289 instance_dict['slow_calls'] = new_calls
290 290
291 291 instance_dict['front_url'] = self.get_public_url(request)
292 292
293 293 exclude_keys_list = exclude_keys or []
294 294 include_keys_list = include_keys or []
295 295 for k in list(instance_dict.keys()):
296 296 if k == 'group':
297 297 continue
298 298 if (k in exclude_keys_list or
299 299 (k not in include_keys_list and include_keys)):
300 300 del instance_dict[k]
301 301 return instance_dict
302 302
303 303 def get_previous_in_group(self, request):
304 304 query = {
305 305 "size": 1,
306 306 "query": {
307 307 "filtered": {
308 308 "filter": {
309 309 "and": [{"term": {"group_id": self.group_id}},
310 310 {"range": {"pg_id": {"lt": self.id}}}]
311 311 }
312 312 }
313 313 },
314 314 "sort": [
315 315 {"_doc": {"order": "desc"}},
316 316 ],
317 317 }
318 318 result = request.es_conn.search(query, index=self.partition_id,
319 319 doc_type='report')
320 320 if result['hits']['total']:
321 321 return result['hits']['hits'][0]['_source']['pg_id']
322 322
323 323 def get_next_in_group(self, request):
324 324 query = {
325 325 "size": 1,
326 326 "query": {
327 327 "filtered": {
328 328 "filter": {
329 329 "and": [{"term": {"group_id": self.group_id}},
330 330 {"range": {"pg_id": {"gt": self.id}}}]
331 331 }
332 332 }
333 333 },
334 334 "sort": [
335 335 {"_doc": {"order": "asc"}},
336 336 ],
337 337 }
338 338 result = request.es_conn.search(query, index=self.partition_id,
339 339 doc_type='report')
340 340 if result['hits']['total']:
341 341 return result['hits']['hits'][0]['_source']['pg_id']
342 342
343 343 def get_public_url(self, request=None, report_group=None, _app_url=None):
344 344 """
345 345 Returns url that user can use to visit specific report
346 346 """
347 347 if not request:
348 348 request = get_current_request()
349 349 url = request.route_url('/', _app_url=_app_url)
350 350 if report_group:
351 351 return (url + 'ui/report/%s/%s') % (report_group.id, self.id)
352 352 return (url + 'ui/report/%s/%s') % (self.group_id, self.id)
353 353
354 354 def req_stats(self):
355 355 stats = self.request_stats.copy()
356 356 stats['percentages'] = {}
357 357 stats['percentages']['main'] = 100.0
358 358 main = stats.get('main', 0.0)
359 359 if not main:
360 360 return None
361 361 for name, call_time in stats.items():
362 362 if ('calls' not in name and 'main' not in name and
363 363 'percentages' not in name):
364 364 stats['main'] -= call_time
365 365 stats['percentages'][name] = math.floor(
366 366 (call_time / main * 100.0))
367 367 stats['percentages']['main'] -= stats['percentages'][name]
368 368 if stats['percentages']['main'] < 0.0:
369 369 stats['percentages']['main'] = 0.0
370 370 stats['main'] = 0.0
371 371 return stats
372 372
373 373 def generate_grouping_hash(self, hash_string=None, default_grouping=None,
374 374 protocol_version=None):
375 375 """
376 376 Generates SHA1 hash that will be used to group reports together
377 377 """
378 378 if not hash_string:
379 379 location = self.tags.get('view_name') or self.url_path;
380 380 server_name = self.tags.get('server_name') or ''
381 381 if default_grouping == 'url_traceback':
382 382 hash_string = '%s_%s_%s' % (self.traceback_hash, location,
383 383 self.error)
384 384 if self.language == Language.javascript:
385 385 hash_string = '%s_%s' % (self.traceback_hash, self.error)
386 386
387 387 elif default_grouping == 'traceback_server':
388 388 hash_string = '%s_%s' % (self.traceback_hash, server_name)
389 389 if self.language == Language.javascript:
390 390 hash_string = '%s_%s' % (self.traceback_hash, server_name)
391 391 else:
392 392 hash_string = '%s_%s' % (self.error, location)
393 393 binary_string = hash_string.encode('utf8')
394 394 self.grouping_hash = hashlib.sha1(binary_string).hexdigest()
395 395 return self.grouping_hash
396 396
397 397 def stripped_traceback(self):
398 398 """
399 399 Traceback without local vars
400 400 """
401 401 stripped_traceback = copy.deepcopy(self.traceback)
402 402
403 403 if isinstance(stripped_traceback, list):
404 404 for row in stripped_traceback:
405 405 row.pop('vars', None)
406 406 return stripped_traceback
407 407
408 408 def notify_channel(self, report_group):
409 409 """
410 410 Sends notification to websocket channel
411 411 """
412 412 settings = get_current_registry().settings
413 log.info('notify cometd')
413 log.info('notify channelstream')
414 414 if self.report_type != ReportType.error:
415 415 return
416 416 payload = {
417 417 'type': 'message',
418 418 "user": '__system__',
419 419 "channel": 'app_%s' % self.resource_id,
420 420 'message': {
421 'type': 'report',
421 'topic': 'front_dashboard.new_topic',
422 422 'report': {
423 423 'group': {
424 424 'priority': report_group.priority,
425 425 'first_timestamp': report_group.first_timestamp,
426 426 'last_timestamp': report_group.last_timestamp,
427 427 'average_duration': report_group.average_duration,
428 428 'occurences': report_group.occurences
429 429 },
430 430 'report_id': self.id,
431 431 'group_id': self.group_id,
432 432 'resource_id': self.resource_id,
433 433 'http_status': self.http_status,
434 434 'url_domain': self.url_domain,
435 435 'url_path': self.url_path,
436 436 'error': self.error or '',
437 437 'server': self.tags.get('server_name'),
438 438 'view_name': self.tags.get('view_name'),
439 439 'front_url': self.get_public_url(),
440 440 }
441 441 }
442 442
443 443 }
444
445 cometd_request(settings['cometd.secret'], '/message', [payload],
444 channelstream_request(settings['cometd.secret'], '/message', [payload],
446 445 servers=[settings['cometd_servers']])
447 446
448 447 def es_doc(self):
449 448 tags = {}
450 449 tag_list = []
451 450 for name, value in self.tags.items():
452 451 name = name.replace('.', '_')
453 452 tag_list.append(name)
454 453 tags[name] = {
455 454 "values": convert_es_type(value),
456 455 "numeric_values": value if (
457 456 isinstance(value, (int, float)) and
458 457 not isinstance(value, bool)) else None}
459 458
460 459 if 'user_name' not in self.tags and self.username:
461 460 tags["user_name"] = {"value": [self.username],
462 461 "numeric_value": None}
463 462 return {
464 463 '_id': str(self.id),
465 464 'pg_id': str(self.id),
466 465 'resource_id': self.resource_id,
467 466 'http_status': self.http_status or '',
468 467 'start_time': self.start_time,
469 468 'end_time': self.end_time,
470 469 'url_domain': self.url_domain if self.url_domain else '',
471 470 'url_path': self.url_path if self.url_path else '',
472 471 'duration': self.duration,
473 472 'error': self.error if self.error else '',
474 473 'report_type': self.report_type,
475 474 'request_id': self.request_id,
476 475 'ip': self.ip,
477 476 'group_id': str(self.group_id),
478 477 '_parent': str(self.group_id),
479 478 'tags': tags,
480 479 'tag_list': tag_list
481 480 }
482 481
483 482 @property
484 483 def partition_id(self):
485 484 return 'rcae_r_%s' % self.report_group_time.strftime('%Y_%m')
486 485
487 486
488 487 def after_insert(mapper, connection, target):
489 488 if not hasattr(target, '_skip_ft_index'):
490 489 data = target.es_doc()
491 490 data.pop('_id', None)
492 491 Datastores.es.index(target.partition_id, 'report', data,
493 492 parent=target.group_id, id=target.id)
494 493
495 494
496 495 def after_update(mapper, connection, target):
497 496 if not hasattr(target, '_skip_ft_index'):
498 497 data = target.es_doc()
499 498 data.pop('_id', None)
500 499 Datastores.es.index(target.partition_id, 'report', data,
501 500 parent=target.group_id, id=target.id)
502 501
503 502
504 503 def after_delete(mapper, connection, target):
505 504 if not hasattr(target, '_skip_ft_index'):
506 505 query = {'term': {'pg_id': target.id}}
507 506 Datastores.es.delete_by_query(target.partition_id, 'report', query)
508 507
509 508
510 509 sa.event.listen(Report, 'after_insert', after_insert)
511 510 sa.event.listen(Report, 'after_update', after_update)
512 511 sa.event.listen(Report, 'after_delete', after_delete)
1 NO CONTENT: modified file
The requested commit or file is too big and content was truncated. Show full diff
1 NO CONTENT: modified file
The requested commit or file is too big and content was truncated. Show full diff
1 NO CONTENT: modified file
The requested commit or file is too big and content was truncated. Show full diff
1 NO CONTENT: modified file
The requested commit or file is too big and content was truncated. Show full diff
1 NO CONTENT: modified file
The requested commit or file is too big and content was truncated. Show full diff
1 NO CONTENT: modified file
The requested commit or file is too big and content was truncated. Show full diff
General Comments 0
You need to be logged in to leave comments. Login now