##// END OF EJS Templates
redis: some cleanups and use of pipelines for better performance
ergo -
Show More
@@ -1,635 +1,650 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2016 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # AppEnlight Enterprise Edition, including its added features, Support
19 19 # services, and proprietary license terms, please see
20 20 # https://rhodecode.com/licenses/
21 21
22 22 import bisect
23 23 import collections
24 24 import math
25 25 from datetime import datetime, timedelta
26 26
27 27 import sqlalchemy as sa
28 28 import pyelasticsearch
29 29
30 30 from celery.utils.log import get_task_logger
31 31 from zope.sqlalchemy import mark_changed
32 32 from pyramid.threadlocal import get_current_request, get_current_registry
33 33 from appenlight.celery import celery
34 34 from appenlight.models.report_group import ReportGroup
35 35 from appenlight.models import DBSession, Datastores
36 36 from appenlight.models.report import Report
37 37 from appenlight.models.log import Log
38 38 from appenlight.models.metric import Metric
39 39 from appenlight.models.event import Event
40 40
41 41 from appenlight.models.services.application import ApplicationService
42 42 from appenlight.models.services.event import EventService
43 43 from appenlight.models.services.log import LogService
44 44 from appenlight.models.services.report import ReportService
45 45 from appenlight.models.services.report_group import ReportGroupService
46 46 from appenlight.models.services.user import UserService
47 47 from appenlight.models.tag import Tag
48 48 from appenlight.lib import print_traceback
49 49 from appenlight.lib.utils import parse_proto, in_batches
50 50 from appenlight.lib.ext_json import json
51 51 from appenlight.lib.redis_keys import REDIS_KEYS
52 52 from appenlight.lib.enums import ReportType
53 53
54 54 log = get_task_logger(__name__)
55 55
56 56 sample_boundries = list(range(100, 1000, 100)) + \
57 57 list(range(1000, 10000, 1000)) + \
58 58 list(range(10000, 100000, 5000))
59 59
60 60
61 61 def pick_sample(total_occurences, report_type=None):
62 62 every = 1.0
63 63 position = bisect.bisect_left(sample_boundries, total_occurences)
64 64 if position > 0:
65 65 if report_type == ReportType.not_found:
66 66 divide = 10.0
67 67 else:
68 68 divide = 100.0
69 69 every = sample_boundries[position - 1] / divide
70 70 return total_occurences % every == 0
71 71
72 72
73 73 @celery.task(queue="default", default_retry_delay=1, max_retries=2)
74 74 def test_exception_task():
75 75 log.error('test celery log', extra={'location': 'celery'})
76 76 log.warning('test celery log', extra={'location': 'celery'})
77 77 raise Exception('Celery exception test')
78 78
79 79
80 80 @celery.task(queue="default", default_retry_delay=1, max_retries=2)
81 81 def test_retry_exception_task():
82 82 try:
83 83 import time
84 84
85 85 time.sleep(1.3)
86 86 log.error('test retry celery log', extra={'location': 'celery'})
87 87 log.warning('test retry celery log', extra={'location': 'celery'})
88 88 raise Exception('Celery exception test')
89 89 except Exception as exc:
90 90 test_retry_exception_task.retry(exc=exc)
91 91
92 92
93 93 @celery.task(queue="reports", default_retry_delay=600, max_retries=144)
94 def add_reports(resource_id, params, dataset, environ=None, **kwargs):
95 proto_version = parse_proto(params.get('protocol_version', ''))
94 def add_reports(resource_id, request_params, dataset, **kwargs):
95 proto_version = parse_proto(request_params.get('protocol_version', ''))
96 96 current_time = datetime.utcnow().replace(second=0, microsecond=0)
97 97 try:
98 98 # we will store solr docs here for single insert
99 99 es_report_docs = {}
100 100 es_report_group_docs = {}
101 101 resource = ApplicationService.by_id(resource_id)
102 102
103 103 tags = []
104 104 es_slow_calls_docs = {}
105 105 es_reports_stats_rows = {}
106 106 for report_data in dataset:
107 107 # build report details for later
108 108 added_details = 0
109 109 report = Report()
110 110 report.set_data(report_data, resource, proto_version)
111 111 report._skip_ft_index = True
112 112
113 113 report_group = ReportGroupService.by_hash_and_resource(
114 114 report.resource_id,
115 115 report.grouping_hash
116 116 )
117 117 occurences = report_data.get('occurences', 1)
118 118 if not report_group:
119 119 # total reports will be +1 moment later
120 120 report_group = ReportGroup(grouping_hash=report.grouping_hash,
121 121 occurences=0, total_reports=0,
122 122 last_report=0,
123 123 priority=report.priority,
124 124 error=report.error,
125 125 first_timestamp=report.start_time)
126 126 report_group._skip_ft_index = True
127 127 report_group.report_type = report.report_type
128 128 report.report_group_time = report_group.first_timestamp
129 129 add_sample = pick_sample(report_group.occurences,
130 130 report_type=report_group.report_type)
131 131 if add_sample:
132 132 resource.report_groups.append(report_group)
133 133 report_group.reports.append(report)
134 134 added_details += 1
135 135 DBSession.flush()
136 136 if report.partition_id not in es_report_docs:
137 137 es_report_docs[report.partition_id] = []
138 138 es_report_docs[report.partition_id].append(report.es_doc())
139 139 tags.extend(list(report.tags.items()))
140 140 slow_calls = report.add_slow_calls(report_data, report_group)
141 141 DBSession.flush()
142 142 for s_call in slow_calls:
143 143 if s_call.partition_id not in es_slow_calls_docs:
144 144 es_slow_calls_docs[s_call.partition_id] = []
145 145 es_slow_calls_docs[s_call.partition_id].append(
146 146 s_call.es_doc())
147 147 # try generating new stat rows if needed
148 148 else:
149 149 # required for postprocessing to not fail later
150 150 report.report_group = report_group
151 151
152 152 stat_row = ReportService.generate_stat_rows(
153 153 report, resource, report_group)
154 154 if stat_row.partition_id not in es_reports_stats_rows:
155 155 es_reports_stats_rows[stat_row.partition_id] = []
156 156 es_reports_stats_rows[stat_row.partition_id].append(
157 157 stat_row.es_doc())
158 158
159 159 # see if we should mark 10th occurence of report
160 160 last_occurences_10 = int(math.floor(report_group.occurences / 10))
161 161 curr_occurences_10 = int(math.floor(
162 162 (report_group.occurences + report.occurences) / 10))
163 163 last_occurences_100 = int(
164 164 math.floor(report_group.occurences / 100))
165 165 curr_occurences_100 = int(math.floor(
166 166 (report_group.occurences + report.occurences) / 100))
167 167 notify_occurences_10 = last_occurences_10 != curr_occurences_10
168 168 notify_occurences_100 = last_occurences_100 != curr_occurences_100
169 169 report_group.occurences = ReportGroup.occurences + occurences
170 170 report_group.last_timestamp = report.start_time
171 171 report_group.summed_duration = ReportGroup.summed_duration + report.duration
172 172 summed_duration = ReportGroup.summed_duration + report.duration
173 173 summed_occurences = ReportGroup.occurences + occurences
174 174 report_group.average_duration = summed_duration / summed_occurences
175 175 report_group.run_postprocessing(report)
176 176 if added_details:
177 177 report_group.total_reports = ReportGroup.total_reports + 1
178 178 report_group.last_report = report.id
179 179 report_group.set_notification_info(notify_10=notify_occurences_10,
180 180 notify_100=notify_occurences_100)
181 181 DBSession.flush()
182 182 report_group.get_report().notify_channel(report_group)
183 183 if report_group.partition_id not in es_report_group_docs:
184 184 es_report_group_docs[report_group.partition_id] = []
185 185 es_report_group_docs[report_group.partition_id].append(
186 186 report_group.es_doc())
187 187
188 188 action = 'REPORT'
189 189 log_msg = '%s: %s %s, client: %s, proto: %s' % (
190 190 action,
191 191 report_data.get('http_status', 'unknown'),
192 192 str(resource),
193 193 report_data.get('client'),
194 194 proto_version)
195 195 log.info(log_msg)
196 196 total_reports = len(dataset)
197 redis_pipeline = Datastores.redis.pipeline(transaction=False)
197 198 key = REDIS_KEYS['counters']['reports_per_minute'].format(current_time)
198 Datastores.redis.incr(key, total_reports)
199 Datastores.redis.expire(key, 3600 * 24)
200 key = REDIS_KEYS['counters']['reports_per_minute_per_app'].format(
201 resource_id, current_time)
202 Datastores.redis.incr(key, total_reports)
203 Datastores.redis.expire(key, 3600 * 24)
199 redis_pipeline.incr(key, total_reports)
200 redis_pipeline.expire(key, 3600 * 24)
201 key = REDIS_KEYS['counters']['reports_per_hour_per_app'].format(
202 resource_id, current_time.replace(minute=0))
203 redis_pipeline.incr(key, total_reports)
204 redis_pipeline.expire(key, 3600 * 24 * 7)
205 redis_pipeline.sadd(
206 REDIS_KEYS['apps_that_got_new_data_per_hour'],
207 resource_id, current_time.replace(minute=0))
208 redis_pipeline.execute()
204 209
205 210 add_reports_es(es_report_group_docs, es_report_docs)
206 211 add_reports_slow_calls_es(es_slow_calls_docs)
207 212 add_reports_stats_rows_es(es_reports_stats_rows)
208 213 return True
209 214 except Exception as exc:
210 215 print_traceback(log)
211 216 add_reports.retry(exc=exc)
212 217
213 218
214 219 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
215 220 def add_reports_es(report_group_docs, report_docs):
216 221 for k, v in report_group_docs.items():
217 222 Datastores.es.bulk_index(k, 'report_group', v, id_field="_id")
218 223 for k, v in report_docs.items():
219 224 Datastores.es.bulk_index(k, 'report', v, id_field="_id",
220 225 parent_field='_parent')
221 226
222 227
223 228 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
224 229 def add_reports_slow_calls_es(es_docs):
225 230 for k, v in es_docs.items():
226 231 Datastores.es.bulk_index(k, 'log', v)
227 232
228 233
229 234 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
230 235 def add_reports_stats_rows_es(es_docs):
231 236 for k, v in es_docs.items():
232 237 Datastores.es.bulk_index(k, 'log', v)
233 238
234 239
235 240 @celery.task(queue="logs", default_retry_delay=600, max_retries=144)
236 def add_logs(resource_id, request, dataset, environ=None, **kwargs):
237 proto_version = request.get('protocol_version')
241 def add_logs(resource_id, request_params, dataset, **kwargs):
242 proto_version = request_params.get('protocol_version')
238 243 current_time = datetime.utcnow().replace(second=0, microsecond=0)
239 244
240 245 try:
241 246 es_docs = collections.defaultdict(list)
242 247 application = ApplicationService.by_id(resource_id)
243 248 ns_pairs = []
244 249 for entry in dataset:
245 250 # gather pk and ns so we can remove older versions of row later
246 251 if entry['primary_key'] is not None:
247 252 ns_pairs.append({"pk": entry['primary_key'],
248 253 "ns": entry['namespace']})
249 254 log_entry = Log()
250 255 log_entry.set_data(entry, resource=application)
251 256 log_entry._skip_ft_index = True
252 257 application.logs.append(log_entry)
253 258 DBSession.flush()
254 259 # insert non pk rows first
255 260 if entry['primary_key'] is None:
256 261 es_docs[log_entry.partition_id].append(log_entry.es_doc())
257 262
258 263 # 2nd pass to delete all log entries from db foe same pk/ns pair
259 264 if ns_pairs:
260 265 ids_to_delete = []
261 266 es_docs = collections.defaultdict(list)
262 267 es_docs_to_delete = collections.defaultdict(list)
263 268 found_pkey_logs = LogService.query_by_primary_key_and_namespace(
264 269 list_of_pairs=ns_pairs)
265 270 log_dict = {}
266 271 for log_entry in found_pkey_logs:
267 272 log_key = (log_entry.primary_key, log_entry.namespace)
268 273 if log_key not in log_dict:
269 274 log_dict[log_key] = []
270 275 log_dict[log_key].append(log_entry)
271 276
272 277 for ns, entry_list in log_dict.items():
273 278 entry_list = sorted(entry_list, key=lambda x: x.timestamp)
274 279 # newest row needs to be indexed in es
275 280 log_entry = entry_list[-1]
276 281 # delete everything from pg and ES, leave the last row in pg
277 282 for e in entry_list[:-1]:
278 283 ids_to_delete.append(e.log_id)
279 284 es_docs_to_delete[e.partition_id].append(e.delete_hash)
280 285
281 286 es_docs_to_delete[log_entry.partition_id].append(
282 287 log_entry.delete_hash)
283 288
284 289 es_docs[log_entry.partition_id].append(log_entry.es_doc())
285 290
286 291 if ids_to_delete:
287 292 query = DBSession.query(Log).filter(
288 293 Log.log_id.in_(ids_to_delete))
289 294 query.delete(synchronize_session=False)
290 295 if es_docs_to_delete:
291 296 # batch this to avoid problems with default ES bulk limits
292 297 for es_index in es_docs_to_delete.keys():
293 298 for batch in in_batches(es_docs_to_delete[es_index], 20):
294 299 query = {'terms': {'delete_hash': batch}}
295 300
296 301 try:
297 302 Datastores.es.delete_by_query(
298 303 es_index, 'log', query)
299 304 except pyelasticsearch.ElasticHttpNotFoundError as exc:
300 305 msg = 'skipping index {}'.format(es_index)
301 306 log.info(msg)
302 307
303 308 total_logs = len(dataset)
304 309
305 310 log_msg = 'LOG_NEW: %s, entries: %s, proto:%s' % (
306 311 str(application),
307 312 total_logs,
308 313 proto_version)
309 314 log.info(log_msg)
310 315 # mark_changed(session)
316 redis_pipeline = Datastores.redis.pipeline(transaction=False)
311 317 key = REDIS_KEYS['counters']['logs_per_minute'].format(current_time)
312 Datastores.redis.incr(key, total_logs)
313 Datastores.redis.expire(key, 3600 * 24)
314 key = REDIS_KEYS['counters']['logs_per_minute_per_app'].format(
315 resource_id, current_time)
316 Datastores.redis.incr(key, total_logs)
317 Datastores.redis.expire(key, 3600 * 24)
318 redis_pipeline.incr(key, total_logs)
319 redis_pipeline.expire(key, 3600 * 24)
320 key = REDIS_KEYS['counters']['logs_per_hour_per_app'].format(
321 resource_id, current_time.replace(minute=0))
322 redis_pipeline.incr(key, total_logs)
323 redis_pipeline.expire(key, 3600 * 24 * 7)
324 redis_pipeline.sadd(
325 REDIS_KEYS['apps_that_got_new_data_per_hour'],
326 resource_id, current_time.replace(minute=0))
327 redis_pipeline.execute()
318 328 add_logs_es(es_docs)
319 329 return True
320 330 except Exception as exc:
321 331 print_traceback(log)
322 332 add_logs.retry(exc=exc)
323 333
324 334
325 335 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
326 336 def add_logs_es(es_docs):
327 337 for k, v in es_docs.items():
328 338 Datastores.es.bulk_index(k, 'log', v)
329 339
330 340
331 341 @celery.task(queue="metrics", default_retry_delay=600, max_retries=144)
332 def add_metrics(resource_id, request, dataset, proto_version):
342 def add_metrics(resource_id, request_params, dataset, proto_version):
333 343 current_time = datetime.utcnow().replace(second=0, microsecond=0)
334 344 try:
335 345 application = ApplicationService.by_id_cached()(resource_id)
336 346 application = DBSession.merge(application, load=False)
337 347 es_docs = []
338 348 rows = []
339 349 for metric in dataset:
340 350 tags = dict(metric['tags'])
341 351 server_n = tags.get('server_name', metric['server_name']).lower()
342 352 tags['server_name'] = server_n or 'unknown'
343 353 new_metric = Metric(
344 354 timestamp=metric['timestamp'],
345 355 resource_id=application.resource_id,
346 356 namespace=metric['namespace'],
347 357 tags=tags)
348 358 rows.append(new_metric)
349 359 es_docs.append(new_metric.es_doc())
350 360 session = DBSession()
351 361 session.bulk_save_objects(rows)
352 362 session.flush()
353 363
354 364 action = 'METRICS'
355 365 metrics_msg = '%s: %s, metrics: %s, proto:%s' % (
356 366 action,
357 367 str(application),
358 368 len(dataset),
359 369 proto_version
360 370 )
361 371 log.info(metrics_msg)
362 372
363 373 mark_changed(session)
374 redis_pipeline = Datastores.redis.pipeline(transaction=False)
364 375 key = REDIS_KEYS['counters']['metrics_per_minute'].format(current_time)
365 Datastores.redis.incr(key, len(rows))
366 Datastores.redis.expire(key, 3600 * 24)
367 key = REDIS_KEYS['counters']['metrics_per_minute_per_app'].format(
368 resource_id, current_time)
369 Datastores.redis.incr(key, len(rows))
370 Datastores.redis.expire(key, 3600 * 24)
376 redis_pipeline.incr(key, len(rows))
377 redis_pipeline.expire(key, 3600 * 24)
378 key = REDIS_KEYS['counters']['metrics_per_hour_per_app'].format(
379 resource_id, current_time.replace(minute=0))
380 redis_pipeline.incr(key, len(rows))
381 redis_pipeline.expire(key, 3600 * 24 * 7)
382 redis_pipeline.sadd(
383 REDIS_KEYS['apps_that_got_new_data_per_hour'],
384 resource_id, current_time.replace(minute=0))
385 redis_pipeline.execute()
371 386 add_metrics_es(es_docs)
372 387 return True
373 388 except Exception as exc:
374 389 print_traceback(log)
375 390 add_metrics.retry(exc=exc)
376 391
377 392
378 393 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
379 394 def add_metrics_es(es_docs):
380 395 for doc in es_docs:
381 396 partition = 'rcae_m_%s' % doc['timestamp'].strftime('%Y_%m_%d')
382 397 Datastores.es.index(partition, 'log', doc)
383 398
384 399
385 400 @celery.task(queue="default", default_retry_delay=5, max_retries=2)
386 401 def check_user_report_notifications(resource_id):
387 402 since_when = datetime.utcnow()
388 403 try:
389 404 request = get_current_request()
390 405 application = ApplicationService.by_id(resource_id)
391 406 if not application:
392 407 return
393 408 error_key = REDIS_KEYS['reports_to_notify_per_type_per_app'].format(
394 409 ReportType.error, resource_id)
395 410 slow_key = REDIS_KEYS['reports_to_notify_per_type_per_app'].format(
396 411 ReportType.slow, resource_id)
397 412 error_group_ids = Datastores.redis.smembers(error_key)
398 413 slow_group_ids = Datastores.redis.smembers(slow_key)
399 414 Datastores.redis.delete(error_key)
400 415 Datastores.redis.delete(slow_key)
401 416 err_gids = [int(g_id) for g_id in error_group_ids]
402 417 slow_gids = [int(g_id) for g_id in list(slow_group_ids)]
403 418 group_ids = err_gids + slow_gids
404 419 occurence_dict = {}
405 420 for g_id in group_ids:
406 421 key = REDIS_KEYS['counters']['report_group_occurences'].format(
407 422 g_id)
408 423 val = Datastores.redis.get(key)
409 424 Datastores.redis.delete(key)
410 425 if val:
411 426 occurence_dict[g_id] = int(val)
412 427 else:
413 428 occurence_dict[g_id] = 1
414 429 report_groups = ReportGroupService.by_ids(group_ids)
415 430 report_groups.options(sa.orm.joinedload(ReportGroup.last_report_ref))
416 431
417 432 ApplicationService.check_for_groups_alert(
418 433 application, 'alert', report_groups=report_groups,
419 434 occurence_dict=occurence_dict)
420 435 users = set([p.user for p in application.users_for_perm('view')])
421 436 report_groups = report_groups.all()
422 437 for user in users:
423 438 UserService.report_notify(user, request, application,
424 439 report_groups=report_groups,
425 440 occurence_dict=occurence_dict)
426 441 for group in report_groups:
427 442 # marks report_groups as notified
428 443 if not group.notified:
429 444 group.notified = True
430 445 except Exception as exc:
431 446 print_traceback(log)
432 447 raise
433 448
434 449
435 450 @celery.task(queue="default", default_retry_delay=5, max_retries=2)
436 451 def check_alerts(resource_id):
437 452 since_when = datetime.utcnow()
438 453 try:
439 454 request = get_current_request()
440 455 application = ApplicationService.by_id(resource_id)
441 456 if not application:
442 457 return
443 458 error_key = REDIS_KEYS[
444 459 'reports_to_notify_per_type_per_app_alerting'].format(
445 460 ReportType.error, resource_id)
446 461 slow_key = REDIS_KEYS[
447 462 'reports_to_notify_per_type_per_app_alerting'].format(
448 463 ReportType.slow, resource_id)
449 464 error_group_ids = Datastores.redis.smembers(error_key)
450 465 slow_group_ids = Datastores.redis.smembers(slow_key)
451 466 Datastores.redis.delete(error_key)
452 467 Datastores.redis.delete(slow_key)
453 468 err_gids = [int(g_id) for g_id in error_group_ids]
454 469 slow_gids = [int(g_id) for g_id in list(slow_group_ids)]
455 470 group_ids = err_gids + slow_gids
456 471 occurence_dict = {}
457 472 for g_id in group_ids:
458 473 key = REDIS_KEYS['counters'][
459 474 'report_group_occurences_alerting'].format(
460 475 g_id)
461 476 val = Datastores.redis.get(key)
462 477 Datastores.redis.delete(key)
463 478 if val:
464 479 occurence_dict[g_id] = int(val)
465 480 else:
466 481 occurence_dict[g_id] = 1
467 482 report_groups = ReportGroupService.by_ids(group_ids)
468 483 report_groups.options(sa.orm.joinedload(ReportGroup.last_report_ref))
469 484
470 485 ApplicationService.check_for_groups_alert(
471 486 application, 'alert', report_groups=report_groups,
472 487 occurence_dict=occurence_dict, since_when=since_when)
473 488 except Exception as exc:
474 489 print_traceback(log)
475 490 raise
476 491
477 492
478 493 @celery.task(queue="default", default_retry_delay=1, max_retries=2)
479 494 def close_alerts():
480 495 log.warning('Checking alerts')
481 496 since_when = datetime.utcnow()
482 497 try:
483 498 event_types = [Event.types['error_report_alert'],
484 499 Event.types['slow_report_alert'], ]
485 500 statuses = [Event.statuses['active']]
486 501 # get events older than 5 min
487 502 events = EventService.by_type_and_status(
488 503 event_types,
489 504 statuses,
490 505 older_than=(since_when - timedelta(minutes=5)))
491 506 for event in events:
492 507 # see if we can close them
493 508 event.validate_or_close(
494 509 since_when=(since_when - timedelta(minutes=1)))
495 510 except Exception as exc:
496 511 print_traceback(log)
497 512 raise
498 513
499 514
500 515 @celery.task(queue="default", default_retry_delay=600, max_retries=144)
501 516 def update_tag_counter(tag_name, tag_value, count):
502 517 try:
503 518 query = DBSession.query(Tag).filter(Tag.name == tag_name).filter(
504 519 sa.cast(Tag.value, sa.types.TEXT) == sa.cast(json.dumps(tag_value),
505 520 sa.types.TEXT))
506 521 query.update({'times_seen': Tag.times_seen + count,
507 522 'last_timestamp': datetime.utcnow()},
508 523 synchronize_session=False)
509 524 session = DBSession()
510 525 mark_changed(session)
511 526 return True
512 527 except Exception as exc:
513 528 print_traceback(log)
514 529 update_tag_counter.retry(exc=exc)
515 530
516 531
517 532 @celery.task(queue="default")
518 533 def update_tag_counters():
519 534 """
520 535 Sets task to update counters for application tags
521 536 """
522 537 tags = Datastores.redis.lrange(REDIS_KEYS['seen_tag_list'], 0, -1)
523 538 Datastores.redis.delete(REDIS_KEYS['seen_tag_list'])
524 539 c = collections.Counter(tags)
525 540 for t_json, count in c.items():
526 541 tag_info = json.loads(t_json)
527 542 update_tag_counter.delay(tag_info[0], tag_info[1], count)
528 543
529 544
530 545 @celery.task(queue="default")
531 546 def daily_digest():
532 547 """
533 548 Sends daily digest with top 50 error reports
534 549 """
535 550 request = get_current_request()
536 551 apps = Datastores.redis.smembers(REDIS_KEYS['apps_that_had_reports'])
537 552 Datastores.redis.delete(REDIS_KEYS['apps_that_had_reports'])
538 553 since_when = datetime.utcnow() - timedelta(hours=8)
539 554 log.warning('Generating daily digests')
540 555 for resource_id in apps:
541 556 resource_id = resource_id.decode('utf8')
542 557 end_date = datetime.utcnow().replace(microsecond=0, second=0)
543 558 filter_settings = {'resource': [resource_id],
544 559 'tags': [{'name': 'type',
545 560 'value': ['error'], 'op': None}],
546 561 'type': 'error', 'start_date': since_when,
547 562 'end_date': end_date}
548 563
549 564 reports = ReportGroupService.get_trending(
550 565 request, filter_settings=filter_settings, limit=50)
551 566
552 567 application = ApplicationService.by_id(resource_id)
553 568 if application:
554 569 users = set([p.user for p in application.users_for_perm('view')])
555 570 for user in users:
556 571 user.send_digest(request, application, reports=reports,
557 572 since_when=since_when)
558 573
559 574
560 575 @celery.task(queue="default")
561 576 def notifications_reports():
562 577 """
563 578 Loop that checks redis for info and then issues new tasks to celery to
564 579 issue notifications
565 580 """
566 581 apps = Datastores.redis.smembers(REDIS_KEYS['apps_that_had_reports'])
567 582 Datastores.redis.delete(REDIS_KEYS['apps_that_had_reports'])
568 583 for app in apps:
569 584 log.warning('Notify for app: %s' % app)
570 585 check_user_report_notifications.delay(app.decode('utf8'))
571 586
572 587 @celery.task(queue="default")
573 588 def alerting_reports():
574 589 """
575 590 Loop that checks redis for info and then issues new tasks to celery to
576 591 perform the following:
577 592 - which applications should have new alerts opened
578 593 """
579 594
580 595 apps = Datastores.redis.smembers(REDIS_KEYS['apps_that_had_reports_alerting'])
581 596 Datastores.redis.delete(REDIS_KEYS['apps_that_had_reports_alerting'])
582 597 for app in apps:
583 598 log.warning('Notify for app: %s' % app)
584 599 check_alerts.delay(app.decode('utf8'))
585 600
586 601
587 602 @celery.task(queue="default", soft_time_limit=3600 * 4,
588 603 hard_time_limit=3600 * 4, max_retries=144)
589 604 def logs_cleanup(resource_id, filter_settings):
590 605 request = get_current_request()
591 606 request.tm.begin()
592 607 es_query = {
593 608 "_source": False,
594 609 "size": 5000,
595 610 "query": {
596 611 "filtered": {
597 612 "filter": {
598 613 "and": [{"term": {"resource_id": resource_id}}]
599 614 }
600 615 }
601 616 }
602 617 }
603 618
604 619 query = DBSession.query(Log).filter(Log.resource_id == resource_id)
605 620 if filter_settings['namespace']:
606 621 query = query.filter(Log.namespace == filter_settings['namespace'][0])
607 622 es_query['query']['filtered']['filter']['and'].append(
608 623 {"term": {"namespace": filter_settings['namespace'][0]}}
609 624 )
610 625 query.delete(synchronize_session=False)
611 626 request.tm.commit()
612 627 result = request.es_conn.search(es_query, index='rcae_l_*',
613 628 doc_type='log', es_scroll='1m',
614 629 es_search_type='scan')
615 630 scroll_id = result['_scroll_id']
616 631 while True:
617 632 log.warning('log_cleanup, app:{} ns:{} batch'.format(
618 633 resource_id,
619 634 filter_settings['namespace']
620 635 ))
621 636 es_docs_to_delete = []
622 637 result = request.es_conn.send_request(
623 638 'POST', ['_search', 'scroll'],
624 639 body=scroll_id, query_params={"scroll": '1m'})
625 640 scroll_id = result['_scroll_id']
626 641 if not result['hits']['hits']:
627 642 break
628 643 for doc in result['hits']['hits']:
629 644 es_docs_to_delete.append({"id": doc['_id'],
630 645 "index": doc['_index']})
631 646
632 647 for batch in in_batches(es_docs_to_delete, 10):
633 648 Datastores.es.bulk([Datastores.es.delete_op(doc_type='log',
634 649 **to_del)
635 650 for to_del in batch])
@@ -1,83 +1,86 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2016 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # AppEnlight Enterprise Edition, including its added features, Support
19 19 # services, and proprietary license terms, please see
20 20 # https://rhodecode.com/licenses/
21 21
22 22 import datetime
23 23 import logging
24 24
25 25 from pyramid.httpexceptions import HTTPForbidden, HTTPTooManyRequests
26 26
27 27 from appenlight.models import Datastores
28 28 from appenlight.models.services.config import ConfigService
29 29 from appenlight.lib.redis_keys import REDIS_KEYS
30 30
31 31 log = logging.getLogger(__name__)
32 32
33 33
34 34 def rate_limiting(request, resource, section, to_increment=1):
35 35 tsample = datetime.datetime.utcnow().replace(second=0, microsecond=0)
36 36 key = REDIS_KEYS['rate_limits'][section].format(tsample,
37 37 resource.resource_id)
38 current_count = Datastores.redis.incr(key, to_increment)
39 Datastores.redis.expire(key, 3600 * 24)
38 redis_pipeline = request.registry.redis_conn.pipeline()
39 redis_pipeline.incr(key, to_increment)
40 redis_pipeline.expire(key, 3600 * 24)
41 results = redis_pipeline.execute()
42 current_count = results[0]
40 43 config = ConfigService.by_key_and_section(section, 'global')
41 44 limit = config.value if config else 1000
42 45 if current_count > int(limit):
43 46 log.info('RATE LIMITING: {}: {}, {}'.format(
44 47 section, resource, current_count))
45 48 abort_msg = 'Rate limits are in effect for this application'
46 49 raise HTTPTooManyRequests(abort_msg,
47 50 headers={'X-AppEnlight': abort_msg})
48 51
49 52
50 53 def check_cors(request, application, should_return=True):
51 54 """
52 55 Performs a check and validation if request comes from authorized domain for
53 56 application, otherwise return 403
54 57 """
55 58 origin_found = False
56 59 origin = request.headers.get('Origin')
57 60 if should_return:
58 61 log.info('CORS for %s' % origin)
59 62 if not origin:
60 63 return False
61 64 for domain in application.domains.split('\n'):
62 65 if domain in origin:
63 66 origin_found = True
64 67 if origin_found:
65 68 request.response.headers.add('Access-Control-Allow-Origin', origin)
66 69 request.response.headers.add('XDomainRequestAllowed', '1')
67 70 request.response.headers.add('Access-Control-Allow-Methods',
68 71 'GET, POST, OPTIONS')
69 72 request.response.headers.add('Access-Control-Allow-Headers',
70 73 'Accept-Encoding, Accept-Language, '
71 74 'Content-Type, '
72 75 'Depth, User-Agent, X-File-Size, '
73 76 'X-Requested-With, If-Modified-Since, '
74 77 'X-File-Name, '
75 78 'Cache-Control, Host, Pragma, Accept, '
76 79 'Origin, Connection, '
77 80 'Referer, Cookie, '
78 81 'X-appenlight-public-api-key, '
79 82 'x-appenlight-public-api-key')
80 83 request.response.headers.add('Access-Control-Max-Age', '86400')
81 84 return request.response
82 85 else:
83 86 return HTTPForbidden()
@@ -1,67 +1,68 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2016 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # AppEnlight Enterprise Edition, including its added features, Support
19 19 # services, and proprietary license terms, please see
20 20 # https://rhodecode.com/licenses/
21 21
22 22 BASE = 'appenlight:data:{}'
23 23
24 24 REDIS_KEYS = {
25 25 'tasks': {
26 26 'add_reports_lock': BASE.format('add_reports_lock:{}'),
27 27 'add_logs_lock': BASE.format('add_logs_lock:{}'),
28 28 },
29 29 'counters': {
30 30 'reports_per_minute': BASE.format('reports_per_minute:{}'),
31 'reports_per_minute_per_app': BASE.format(
32 'reports_per_minute_per_app:{}:{}'),
31 'reports_per_hour_per_app': BASE.format(
32 'reports_per_hour_per_app:{}:{}'),
33 33 'reports_per_type': BASE.format('reports_per_type:{}'),
34 34 'logs_per_minute': BASE.format('logs_per_minute:{}'),
35 'logs_per_minute_per_app': BASE.format(
36 'logs_per_minute_per_app:{}:{}'),
35 'logs_per_hour_per_app': BASE.format(
36 'logs_per_hour_per_app:{}:{}'),
37 37 'metrics_per_minute': BASE.format('metrics_per_minute:{}'),
38 'metrics_per_minute_per_app': BASE.format(
39 'metrics_per_minute_per_app:{}:{}'),
38 'metrics_per_hour_per_app': BASE.format(
39 'metrics_per_hour_per_app:{}:{}'),
40 40 'report_group_occurences': BASE.format('report_group_occurences:{}'),
41 41 'report_group_occurences_alerting': BASE.format(
42 42 'report_group_occurences_alerting:{}'),
43 43 'report_group_occurences_10th': BASE.format(
44 44 'report_group_occurences_10th:{}'),
45 45 'report_group_occurences_100th': BASE.format(
46 46 'report_group_occurences_100th:{}'),
47 47 },
48 48 'rate_limits': {
49 49 'per_application_reports_rate_limit': BASE.format(
50 50 'per_application_reports_limit:{}:{}'),
51 51 'per_application_logs_rate_limit': BASE.format(
52 52 'per_application_logs_rate_limit:{}:{}'),
53 53 'per_application_metrics_rate_limit': BASE.format(
54 54 'per_application_metrics_rate_limit:{}:{}'),
55 55 },
56 'apps_that_got_new_data_per_hour': BASE.format('apps_that_got_new_data_per_hour'),
56 57 'apps_that_had_reports': BASE.format('apps_that_had_reports'),
57 58 'apps_that_had_error_reports': BASE.format('apps_that_had_error_reports'),
58 59 'apps_that_had_reports_alerting': BASE.format(
59 60 'apps_that_had_reports_alerting'),
60 61 'apps_that_had_error_reports_alerting': BASE.format(
61 62 'apps_that_had_error_reports_alerting'),
62 63 'reports_to_notify_per_type_per_app': BASE.format(
63 64 'reports_to_notify_per_type_per_app:{}:{}'),
64 65 'reports_to_notify_per_type_per_app_alerting': BASE.format(
65 66 'reports_to_notify_per_type_per_app_alerting:{}:{}'),
66 67 'seen_tag_list': BASE.format('seen_tag_list')
67 68 }
@@ -1,267 +1,268 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2016 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # AppEnlight Enterprise Edition, including its added features, Support
19 19 # services, and proprietary license terms, please see
20 20 # https://rhodecode.com/licenses/
21 21
22 22 import logging
23 23 import sqlalchemy as sa
24 24
25 25 from datetime import datetime
26 26
27 27 from pyramid.threadlocal import get_current_request
28 28 from sqlalchemy.dialects.postgresql import JSON
29 29 from ziggurat_foundations.models.base import BaseModel
30 30
31 31 from appenlight.models import Base, get_db_session, Datastores
32 32 from appenlight.lib.enums import ReportType
33 33 from appenlight.lib.rule import Rule
34 34 from appenlight.lib.redis_keys import REDIS_KEYS
35 35 from appenlight.models.report import REPORT_TYPE_MATRIX
36 36
37 37 log = logging.getLogger(__name__)
38 38
39 39
40 40 class ReportGroup(Base, BaseModel):
41 41 __tablename__ = 'reports_groups'
42 42 __table_args__ = {'implicit_returning': False}
43 43
44 44 id = sa.Column(sa.BigInteger(), nullable=False, primary_key=True)
45 45 resource_id = sa.Column(sa.Integer(),
46 46 sa.ForeignKey('applications.resource_id',
47 47 onupdate='CASCADE',
48 48 ondelete='CASCADE'),
49 49 nullable=False,
50 50 index=True)
51 51 priority = sa.Column(sa.Integer, nullable=False, index=True, default=5,
52 52 server_default='5')
53 53 first_timestamp = sa.Column(sa.DateTime(), default=datetime.utcnow,
54 54 server_default=sa.func.now())
55 55 last_timestamp = sa.Column(sa.DateTime(), default=datetime.utcnow,
56 56 server_default=sa.func.now())
57 57 error = sa.Column(sa.UnicodeText(), index=True)
58 58 grouping_hash = sa.Column(sa.String(40), default='')
59 59 triggered_postprocesses_ids = sa.Column(JSON(), nullable=False,
60 60 default=list)
61 61 report_type = sa.Column(sa.Integer, default=1)
62 62 total_reports = sa.Column(sa.Integer, default=1)
63 63 last_report = sa.Column(sa.Integer)
64 64 occurences = sa.Column(sa.Integer, default=1)
65 65 average_duration = sa.Column(sa.Float, default=0)
66 66 summed_duration = sa.Column(sa.Float, default=0)
67 67 read = sa.Column(sa.Boolean(), index=True, default=False)
68 68 fixed = sa.Column(sa.Boolean(), index=True, default=False)
69 69 notified = sa.Column(sa.Boolean(), index=True, default=False)
70 70 public = sa.Column(sa.Boolean(), index=True, default=False)
71 71
72 72 reports = sa.orm.relationship('Report',
73 73 lazy='dynamic',
74 74 backref='report_group',
75 75 cascade="all, delete-orphan",
76 76 passive_deletes=True,
77 77 passive_updates=True, )
78 78
79 79 comments = sa.orm.relationship('ReportComment',
80 80 lazy='dynamic',
81 81 backref='report',
82 82 cascade="all, delete-orphan",
83 83 passive_deletes=True,
84 84 passive_updates=True,
85 85 order_by="ReportComment.comment_id")
86 86
87 87 assigned_users = sa.orm.relationship('User',
88 88 backref=sa.orm.backref(
89 89 'assigned_reports_relation',
90 90 lazy='dynamic',
91 91 order_by=sa.desc(
92 92 "reports_groups.id")
93 93 ),
94 94 passive_deletes=True,
95 95 passive_updates=True,
96 96 secondary='reports_assignments',
97 97 order_by="User.user_name")
98 98
99 99 stats = sa.orm.relationship('ReportStat',
100 100 lazy='dynamic',
101 101 backref='report',
102 102 passive_deletes=True,
103 103 passive_updates=True, )
104 104
105 105 last_report_ref = sa.orm.relationship('Report',
106 106 uselist=False,
107 107 primaryjoin="ReportGroup.last_report "
108 108 "== Report.id",
109 109 foreign_keys="Report.id",
110 110 cascade="all, delete-orphan",
111 111 passive_deletes=True,
112 112 passive_updates=True, )
113 113
114 114 def __repr__(self):
115 115 return '<ReportGroup id:{}>'.format(self.id)
116 116
117 117 def get_report(self, report_id=None, public=False):
118 118 """
119 119 Gets report with specific id or latest report if id was not specified
120 120 """
121 121 from .report import Report
122 122
123 123 if not report_id:
124 124 return self.last_report_ref
125 125 else:
126 126 return self.reports.filter(Report.id == report_id).first()
127 127
128 128 def get_public_url(self, request, _app_url=None):
129 129 url = request.route_url('/', _app_url=_app_url)
130 130 return (url + 'ui/report/%s') % self.id
131 131
132 132 def run_postprocessing(self, report):
133 133 """
134 134 Alters report group priority based on postprocessing configuration
135 135 """
136 136 request = get_current_request()
137 137 get_db_session(None, self).flush()
138 138 for action in self.application.postprocess_conf:
139 139 get_db_session(None, self).flush()
140 140 rule_obj = Rule(action.rule, REPORT_TYPE_MATRIX)
141 141 report_dict = report.get_dict(request)
142 142 # if was not processed yet
143 143 if (rule_obj.match(report_dict) and
144 144 action.pkey not in self.triggered_postprocesses_ids):
145 145 action.postprocess(self)
146 146 # this way sqla can track mutation of list
147 147 self.triggered_postprocesses_ids = \
148 148 self.triggered_postprocesses_ids + [action.pkey]
149 149
150 150 get_db_session(None, self).flush()
151 151 # do not go out of bounds
152 152 if self.priority < 1:
153 153 self.priority = 1
154 154 if self.priority > 10:
155 155 self.priority = 10
156 156
157 157 def get_dict(self, request):
158 158 instance_dict = super(ReportGroup, self).get_dict()
159 159 instance_dict['server_name'] = self.get_report().tags.get(
160 160 'server_name')
161 161 instance_dict['view_name'] = self.get_report().tags.get('view_name')
162 162 instance_dict['resource_name'] = self.application.resource_name
163 163 instance_dict['report_type'] = self.get_report().report_type
164 164 instance_dict['url_path'] = self.get_report().url_path
165 165 instance_dict['front_url'] = self.get_report().get_public_url(request)
166 166 del instance_dict['triggered_postprocesses_ids']
167 167 return instance_dict
168 168
169 169 def es_doc(self):
170 170 return {
171 171 '_id': str(self.id),
172 172 'pg_id': str(self.id),
173 173 'resource_id': self.resource_id,
174 174 'error': self.error,
175 175 'fixed': self.fixed,
176 176 'public': self.public,
177 177 'read': self.read,
178 178 'priority': self.priority,
179 179 'occurences': self.occurences,
180 180 'average_duration': self.average_duration,
181 181 'summed_duration': self.summed_duration,
182 182 'first_timestamp': self.first_timestamp,
183 183 'last_timestamp': self.last_timestamp
184 184 }
185 185
186 186 def set_notification_info(self, notify_10=False, notify_100=False):
187 187 """
188 188 Update redis notification maps for notification job
189 189 """
190 190 current_time = datetime.utcnow().replace(second=0, microsecond=0)
191 191 # global app counter
192 192 key = REDIS_KEYS['counters']['reports_per_type'].format(
193 193 self.report_type, current_time)
194 Datastores.redis.incr(key)
195 Datastores.redis.expire(key, 3600 * 24)
194 redis_pipeline = Datastores.redis.pipeline()
195 redis_pipeline.incr(key)
196 redis_pipeline.expire(key, 3600 * 24)
196 197 # detailed app notification for alerts and notifications
197 Datastores.redis.sadd(REDIS_KEYS['apps_that_had_reports'],
198 self.resource_id)
199 Datastores.redis.sadd(REDIS_KEYS['apps_that_had_reports_alerting'],
200 self.resource_id)
198 redis_pipeline.sadd(
199 REDIS_KEYS['apps_that_had_reports'], self.resource_id)
200 redis_pipeline.sadd(
201 REDIS_KEYS['apps_that_had_reports_alerting'], self.resource_id)
201 202 # only notify for exceptions here
202 203 if self.report_type == ReportType.error:
203 Datastores.redis.sadd(
204 REDIS_KEYS['apps_that_had_reports'],
205 self.resource_id)
206 Datastores.redis.sadd(
204 redis_pipeline.sadd(
205 REDIS_KEYS['apps_that_had_reports'], self.resource_id)
206 redis_pipeline.sadd(
207 207 REDIS_KEYS['apps_that_had_error_reports_alerting'],
208 208 self.resource_id)
209 209 key = REDIS_KEYS['counters']['report_group_occurences'].format(self.id)
210 Datastores.redis.incr(key)
211 Datastores.redis.expire(key, 3600 * 24)
210 redis_pipeline.incr(key)
211 redis_pipeline.expire(key, 3600 * 24)
212 212 key = REDIS_KEYS['counters']['report_group_occurences_alerting'].format(self.id)
213 Datastores.redis.incr(key)
214 Datastores.redis.expire(key, 3600 * 24)
213 redis_pipeline.incr(key)
214 redis_pipeline.expire(key, 3600 * 24)
215 215
216 216 if notify_10:
217 217 key = REDIS_KEYS['counters'][
218 218 'report_group_occurences_10th'].format(self.id)
219 Datastores.redis.setex(key, 3600 * 24, 1)
219 redis_pipeline.setex(key, 3600 * 24, 1)
220 220 if notify_100:
221 221 key = REDIS_KEYS['counters'][
222 222 'report_group_occurences_100th'].format(self.id)
223 Datastores.redis.setex(key, 3600 * 24, 1)
223 redis_pipeline.setex(key, 3600 * 24, 1)
224 224
225 225 key = REDIS_KEYS['reports_to_notify_per_type_per_app'].format(
226 226 self.report_type, self.resource_id)
227 Datastores.redis.sadd(key, self.id)
228 Datastores.redis.expire(key, 3600 * 24)
227 redis_pipeline.sadd(key, self.id)
228 redis_pipeline.expire(key, 3600 * 24)
229 229 key = REDIS_KEYS['reports_to_notify_per_type_per_app_alerting'].format(
230 230 self.report_type, self.resource_id)
231 Datastores.redis.sadd(key, self.id)
232 Datastores.redis.expire(key, 3600 * 24)
231 redis_pipeline.sadd(key, self.id)
232 redis_pipeline.expire(key, 3600 * 24)
233 redis_pipeline.execute()
233 234
234 235 @property
235 236 def partition_id(self):
236 237 return 'rcae_r_%s' % self.first_timestamp.strftime('%Y_%m')
237 238
238 239
239 240 def after_insert(mapper, connection, target):
240 241 if not hasattr(target, '_skip_ft_index'):
241 242 data = target.es_doc()
242 243 data.pop('_id', None)
243 244 Datastores.es.index(target.partition_id, 'report_group',
244 245 data, id=target.id)
245 246
246 247
247 248 def after_update(mapper, connection, target):
248 249 if not hasattr(target, '_skip_ft_index'):
249 250 data = target.es_doc()
250 251 data.pop('_id', None)
251 252 Datastores.es.index(target.partition_id, 'report_group',
252 253 data, id=target.id)
253 254
254 255
255 256 def after_delete(mapper, connection, target):
256 257 query = {'term': {'group_id': target.id}}
257 258 # TODO: routing seems unnecessary, need to test a bit more
258 259 #Datastores.es.delete_by_query(target.partition_id, 'report', query,
259 260 # query_params={'routing':str(target.id)})
260 261 Datastores.es.delete_by_query(target.partition_id, 'report', query)
261 262 query = {'term': {'pg_id': target.id}}
262 263 Datastores.es.delete_by_query(target.partition_id, 'report_group', query)
263 264
264 265
265 266 sa.event.listen(ReportGroup, 'after_insert', after_insert)
266 267 sa.event.listen(ReportGroup, 'after_update', after_update)
267 268 sa.event.listen(ReportGroup, 'after_delete', after_delete)
@@ -1,200 +1,199 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2016 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # AppEnlight Enterprise Edition, including its added features, Support
19 19 # services, and proprietary license terms, please see
20 20 # https://rhodecode.com/licenses/
21 21
22 22 import logging
23 23 import os
24 24 import pkg_resources
25 25
26 26 from datetime import datetime, timedelta
27 27
28 28 import psutil
29 29 import redis
30 30
31 31 from pyramid.view import view_config
32 32 from appenlight.models import DBSession
33 33 from appenlight.models import Datastores
34 34 from appenlight.lib.redis_keys import REDIS_KEYS
35 35
36 36
37 37 def bytes2human(total):
38 38 giga = 1024.0 ** 3
39 39 mega = 1024.0 ** 2
40 40 kilo = 1024.0
41 41 if giga <= total:
42 42 return '{:0.1f}G'.format(total / giga)
43 43 elif mega <= total:
44 44 return '{:0.1f}M'.format(total / mega)
45 45 else:
46 46 return '{:0.1f}K'.format(total / kilo)
47 47
48 48
49 49 log = logging.getLogger(__name__)
50 50
51 51
52 52 @view_config(route_name='section_view',
53 53 match_param=['section=admin_section', 'view=system'],
54 54 renderer='json', permission='root_administration')
55 55 def system(request):
56 56 current_time = datetime.utcnow(). \
57 57 replace(second=0, microsecond=0) - timedelta(minutes=1)
58 58 # global app counter
59
60 processed_reports = Datastores.redis.get(
59 processed_reports = request.registry.redis_conn.get(
61 60 REDIS_KEYS['counters']['reports_per_minute'].format(current_time))
62 61 processed_reports = int(processed_reports) if processed_reports else 0
63 processed_logs = Datastores.redis.get(
62 processed_logs = request.registry.redis_conn.get(
64 63 REDIS_KEYS['counters']['logs_per_minute'].format(current_time))
65 64 processed_logs = int(processed_logs) if processed_logs else 0
66 processed_metrics = Datastores.redis.get(
65 processed_metrics = request.registry.redis_conn.get(
67 66 REDIS_KEYS['counters']['metrics_per_minute'].format(current_time))
68 67 processed_metrics = int(processed_metrics) if processed_metrics else 0
69 68
70 69 waiting_reports = 0
71 70 waiting_logs = 0
72 71 waiting_metrics = 0
73 72 waiting_other = 0
74 73
75 74 if 'redis' in request.registry.settings['celery.broker_type']:
76 75 redis_client = redis.StrictRedis.from_url(
77 76 request.registry.settings['celery.broker_url'])
78 77 waiting_reports = redis_client.llen('reports')
79 78 waiting_logs = redis_client.llen('logs')
80 79 waiting_metrics = redis_client.llen('metrics')
81 80 waiting_other = redis_client.llen('default')
82 81
83 82 # process
84 83 def replace_inf(val):
85 84 return val if val != psutil.RLIM_INFINITY else 'unlimited'
86 85
87 86 p = psutil.Process()
88 87 fd = p.rlimit(psutil.RLIMIT_NOFILE)
89 88 memlock = p.rlimit(psutil.RLIMIT_MEMLOCK)
90 89 self_info = {
91 90 'fds': {'soft': replace_inf(fd[0]),
92 91 'hard': replace_inf(fd[1])},
93 92 'memlock': {'soft': replace_inf(memlock[0]),
94 93 'hard': replace_inf(memlock[1])},
95 94 }
96 95
97 96 # disks
98 97 disks = []
99 98 for part in psutil.disk_partitions(all=False):
100 99 if os.name == 'nt':
101 100 if 'cdrom' in part.opts or part.fstype == '':
102 101 continue
103 102 usage = psutil.disk_usage(part.mountpoint)
104 103 disks.append({
105 104 'device': part.device,
106 105 'total': bytes2human(usage.total),
107 106 'used': bytes2human(usage.used),
108 107 'free': bytes2human(usage.free),
109 108 'percentage': int(usage.percent),
110 109 'mountpoint': part.mountpoint,
111 110 'fstype': part.fstype
112 111 })
113 112
114 113 # memory
115 114 memory_v = psutil.virtual_memory()
116 115 memory_s = psutil.swap_memory()
117 116
118 117 memory = {
119 118 'total': bytes2human(memory_v.total),
120 119 'available': bytes2human(memory_v.available),
121 120 'percentage': memory_v.percent,
122 121 'used': bytes2human(memory_v.used),
123 122 'free': bytes2human(memory_v.free),
124 123 'active': bytes2human(memory_v.active),
125 124 'inactive': bytes2human(memory_v.inactive),
126 125 'buffers': bytes2human(memory_v.buffers),
127 126 'cached': bytes2human(memory_v.cached),
128 127 'swap_total': bytes2human(memory_s.total),
129 128 'swap_used': bytes2human(memory_s.used)
130 129 }
131 130
132 131 # load
133 132 system_load = os.getloadavg()
134 133
135 134 # processes
136 135 min_mem = 1024 * 1024 * 40 # 40MB
137 136 process_info = []
138 137 for p in psutil.process_iter():
139 138 mem_used = p.get_memory_info().rss
140 139 if mem_used < min_mem:
141 140 continue
142 141 process_info.append({'owner': p.username(),
143 142 'pid': p.pid,
144 143 'cpu': round(p.get_cpu_percent(interval=0), 1),
145 144 'mem_percentage': round(p.get_memory_percent(),
146 145 1),
147 146 'mem_usage': bytes2human(mem_used),
148 147 'name': p.name(),
149 148 'command': ' '.join(p.cmdline())
150 149 })
151 150 process_info = sorted(process_info, key=lambda x: x['mem_percentage'],
152 151 reverse=True)
153 152
154 153 # pg tables
155 154
156 155 db_size_query = '''
157 156 SELECT tablename, pg_total_relation_size(tablename::text) size
158 157 FROM pg_tables WHERE tablename NOT LIKE 'pg_%' AND
159 158 tablename NOT LIKE 'sql_%' ORDER BY size DESC;'''
160 159
161 160 db_tables = []
162 161 for row in DBSession.execute(db_size_query):
163 162 db_tables.append({"size_human": bytes2human(row.size),
164 163 "table_name": row.tablename})
165 164
166 165 # es indices
167 166 es_indices = []
168 167 result = Datastores.es.send_request(
169 168 'GET', ['_stats', 'store, docs'], query_params={})
170 169 for ix, stats in result['indices'].items():
171 170 size = stats['primaries']['store']['size_in_bytes']
172 171 es_indices.append({'name': ix,
173 172 'size': size,
174 173 'size_human': bytes2human(size)})
175 174
176 175 # packages
177 176
178 177 packages = ({'name': p.project_name, 'version': p.version}
179 178 for p in pkg_resources.working_set)
180 179
181 180 return {'db_tables': db_tables,
182 181 'es_indices': sorted(es_indices,
183 182 key=lambda x: x['size'], reverse=True),
184 183 'process_info': process_info,
185 184 'system_load': system_load,
186 185 'disks': disks,
187 186 'memory': memory,
188 187 'packages': sorted(packages, key=lambda x: x['name'].lower()),
189 188 'current_time': current_time,
190 189 'queue_stats': {
191 190 'processed_reports': processed_reports,
192 191 'processed_logs': processed_logs,
193 192 'processed_metrics': processed_metrics,
194 193 'waiting_reports': waiting_reports,
195 194 'waiting_logs': waiting_logs,
196 195 'waiting_metrics': waiting_metrics,
197 196 'waiting_other': waiting_other
198 197 },
199 198 'self_info': self_info
200 199 }
General Comments 0
You need to be logged in to leave comments. Login now