##// END OF EJS Templates
tasks: better counters
ergo -
Show More
@@ -1,650 +1,663 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2016 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # AppEnlight Enterprise Edition, including its added features, Support
19 19 # services, and proprietary license terms, please see
20 20 # https://rhodecode.com/licenses/
21 21
22 22 import bisect
23 23 import collections
24 24 import math
25 25 from datetime import datetime, timedelta
26 26
27 27 import sqlalchemy as sa
28 28 import pyelasticsearch
29 29
30 30 from celery.utils.log import get_task_logger
31 31 from zope.sqlalchemy import mark_changed
32 32 from pyramid.threadlocal import get_current_request, get_current_registry
33 33 from appenlight.celery import celery
34 34 from appenlight.models.report_group import ReportGroup
35 35 from appenlight.models import DBSession, Datastores
36 36 from appenlight.models.report import Report
37 37 from appenlight.models.log import Log
38 38 from appenlight.models.metric import Metric
39 39 from appenlight.models.event import Event
40 40
41 41 from appenlight.models.services.application import ApplicationService
42 42 from appenlight.models.services.event import EventService
43 43 from appenlight.models.services.log import LogService
44 44 from appenlight.models.services.report import ReportService
45 45 from appenlight.models.services.report_group import ReportGroupService
46 46 from appenlight.models.services.user import UserService
47 47 from appenlight.models.tag import Tag
48 48 from appenlight.lib import print_traceback
49 49 from appenlight.lib.utils import parse_proto, in_batches
50 50 from appenlight.lib.ext_json import json
51 51 from appenlight.lib.redis_keys import REDIS_KEYS
52 52 from appenlight.lib.enums import ReportType
53 53
54 54 log = get_task_logger(__name__)
55 55
56 56 sample_boundries = list(range(100, 1000, 100)) + \
57 57 list(range(1000, 10000, 1000)) + \
58 58 list(range(10000, 100000, 5000))
59 59
60 60
61 61 def pick_sample(total_occurences, report_type=None):
62 62 every = 1.0
63 63 position = bisect.bisect_left(sample_boundries, total_occurences)
64 64 if position > 0:
65 65 if report_type == ReportType.not_found:
66 66 divide = 10.0
67 67 else:
68 68 divide = 100.0
69 69 every = sample_boundries[position - 1] / divide
70 70 return total_occurences % every == 0
71 71
72 72
73 73 @celery.task(queue="default", default_retry_delay=1, max_retries=2)
74 74 def test_exception_task():
75 75 log.error('test celery log', extra={'location': 'celery'})
76 76 log.warning('test celery log', extra={'location': 'celery'})
77 77 raise Exception('Celery exception test')
78 78
79 79
80 80 @celery.task(queue="default", default_retry_delay=1, max_retries=2)
81 81 def test_retry_exception_task():
82 82 try:
83 83 import time
84 84
85 85 time.sleep(1.3)
86 86 log.error('test retry celery log', extra={'location': 'celery'})
87 87 log.warning('test retry celery log', extra={'location': 'celery'})
88 88 raise Exception('Celery exception test')
89 89 except Exception as exc:
90 90 test_retry_exception_task.retry(exc=exc)
91 91
92 92
93 93 @celery.task(queue="reports", default_retry_delay=600, max_retries=144)
94 94 def add_reports(resource_id, request_params, dataset, **kwargs):
95 95 proto_version = parse_proto(request_params.get('protocol_version', ''))
96 96 current_time = datetime.utcnow().replace(second=0, microsecond=0)
97 97 try:
98 98 # we will store solr docs here for single insert
99 99 es_report_docs = {}
100 100 es_report_group_docs = {}
101 101 resource = ApplicationService.by_id(resource_id)
102 102
103 103 tags = []
104 104 es_slow_calls_docs = {}
105 105 es_reports_stats_rows = {}
106 106 for report_data in dataset:
107 107 # build report details for later
108 108 added_details = 0
109 109 report = Report()
110 110 report.set_data(report_data, resource, proto_version)
111 111 report._skip_ft_index = True
112 112
113 113 report_group = ReportGroupService.by_hash_and_resource(
114 114 report.resource_id,
115 115 report.grouping_hash
116 116 )
117 117 occurences = report_data.get('occurences', 1)
118 118 if not report_group:
119 119 # total reports will be +1 moment later
120 120 report_group = ReportGroup(grouping_hash=report.grouping_hash,
121 121 occurences=0, total_reports=0,
122 122 last_report=0,
123 123 priority=report.priority,
124 124 error=report.error,
125 125 first_timestamp=report.start_time)
126 126 report_group._skip_ft_index = True
127 127 report_group.report_type = report.report_type
128 128 report.report_group_time = report_group.first_timestamp
129 129 add_sample = pick_sample(report_group.occurences,
130 130 report_type=report_group.report_type)
131 131 if add_sample:
132 132 resource.report_groups.append(report_group)
133 133 report_group.reports.append(report)
134 134 added_details += 1
135 135 DBSession.flush()
136 136 if report.partition_id not in es_report_docs:
137 137 es_report_docs[report.partition_id] = []
138 138 es_report_docs[report.partition_id].append(report.es_doc())
139 139 tags.extend(list(report.tags.items()))
140 140 slow_calls = report.add_slow_calls(report_data, report_group)
141 141 DBSession.flush()
142 142 for s_call in slow_calls:
143 143 if s_call.partition_id not in es_slow_calls_docs:
144 144 es_slow_calls_docs[s_call.partition_id] = []
145 145 es_slow_calls_docs[s_call.partition_id].append(
146 146 s_call.es_doc())
147 147 # try generating new stat rows if needed
148 148 else:
149 149 # required for postprocessing to not fail later
150 150 report.report_group = report_group
151 151
152 152 stat_row = ReportService.generate_stat_rows(
153 153 report, resource, report_group)
154 154 if stat_row.partition_id not in es_reports_stats_rows:
155 155 es_reports_stats_rows[stat_row.partition_id] = []
156 156 es_reports_stats_rows[stat_row.partition_id].append(
157 157 stat_row.es_doc())
158 158
159 159 # see if we should mark 10th occurence of report
160 160 last_occurences_10 = int(math.floor(report_group.occurences / 10))
161 161 curr_occurences_10 = int(math.floor(
162 162 (report_group.occurences + report.occurences) / 10))
163 163 last_occurences_100 = int(
164 164 math.floor(report_group.occurences / 100))
165 165 curr_occurences_100 = int(math.floor(
166 166 (report_group.occurences + report.occurences) / 100))
167 167 notify_occurences_10 = last_occurences_10 != curr_occurences_10
168 168 notify_occurences_100 = last_occurences_100 != curr_occurences_100
169 169 report_group.occurences = ReportGroup.occurences + occurences
170 170 report_group.last_timestamp = report.start_time
171 171 report_group.summed_duration = ReportGroup.summed_duration + report.duration
172 172 summed_duration = ReportGroup.summed_duration + report.duration
173 173 summed_occurences = ReportGroup.occurences + occurences
174 174 report_group.average_duration = summed_duration / summed_occurences
175 175 report_group.run_postprocessing(report)
176 176 if added_details:
177 177 report_group.total_reports = ReportGroup.total_reports + 1
178 178 report_group.last_report = report.id
179 179 report_group.set_notification_info(notify_10=notify_occurences_10,
180 180 notify_100=notify_occurences_100)
181 181 DBSession.flush()
182 182 report_group.get_report().notify_channel(report_group)
183 183 if report_group.partition_id not in es_report_group_docs:
184 184 es_report_group_docs[report_group.partition_id] = []
185 185 es_report_group_docs[report_group.partition_id].append(
186 186 report_group.es_doc())
187 187
188 188 action = 'REPORT'
189 189 log_msg = '%s: %s %s, client: %s, proto: %s' % (
190 190 action,
191 191 report_data.get('http_status', 'unknown'),
192 192 str(resource),
193 193 report_data.get('client'),
194 194 proto_version)
195 195 log.info(log_msg)
196 196 total_reports = len(dataset)
197 197 redis_pipeline = Datastores.redis.pipeline(transaction=False)
198 198 key = REDIS_KEYS['counters']['reports_per_minute'].format(current_time)
199 199 redis_pipeline.incr(key, total_reports)
200 200 redis_pipeline.expire(key, 3600 * 24)
201 key = REDIS_KEYS['counters']['events_per_minute_per_user'].format(
202 resource.owner_user_id, current_time)
203 redis_pipeline.incr(key, total_reports)
204 redis_pipeline.expire(key, 3600)
201 205 key = REDIS_KEYS['counters']['reports_per_hour_per_app'].format(
202 206 resource_id, current_time.replace(minute=0))
203 207 redis_pipeline.incr(key, total_reports)
204 208 redis_pipeline.expire(key, 3600 * 24 * 7)
205 209 redis_pipeline.sadd(
206 210 REDIS_KEYS['apps_that_got_new_data_per_hour'].format(
207 211 current_time.replace(minute=0)), resource_id)
208 212 redis_pipeline.execute()
209 213
210 214 add_reports_es(es_report_group_docs, es_report_docs)
211 215 add_reports_slow_calls_es(es_slow_calls_docs)
212 216 add_reports_stats_rows_es(es_reports_stats_rows)
213 217 return True
214 218 except Exception as exc:
215 219 print_traceback(log)
216 220 add_reports.retry(exc=exc)
217 221
218 222
219 223 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
220 224 def add_reports_es(report_group_docs, report_docs):
221 225 for k, v in report_group_docs.items():
222 226 Datastores.es.bulk_index(k, 'report_group', v, id_field="_id")
223 227 for k, v in report_docs.items():
224 228 Datastores.es.bulk_index(k, 'report', v, id_field="_id",
225 229 parent_field='_parent')
226 230
227 231
228 232 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
229 233 def add_reports_slow_calls_es(es_docs):
230 234 for k, v in es_docs.items():
231 235 Datastores.es.bulk_index(k, 'log', v)
232 236
233 237
234 238 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
235 239 def add_reports_stats_rows_es(es_docs):
236 240 for k, v in es_docs.items():
237 241 Datastores.es.bulk_index(k, 'log', v)
238 242
239 243
240 244 @celery.task(queue="logs", default_retry_delay=600, max_retries=144)
241 245 def add_logs(resource_id, request_params, dataset, **kwargs):
242 246 proto_version = request_params.get('protocol_version')
243 247 current_time = datetime.utcnow().replace(second=0, microsecond=0)
244 248
245 249 try:
246 250 es_docs = collections.defaultdict(list)
247 application = ApplicationService.by_id(resource_id)
251 resource = ApplicationService.by_id_cached()(resource_id)
252 resource = DBSession.merge(resource, load=False)
248 253 ns_pairs = []
249 254 for entry in dataset:
250 255 # gather pk and ns so we can remove older versions of row later
251 256 if entry['primary_key'] is not None:
252 257 ns_pairs.append({"pk": entry['primary_key'],
253 258 "ns": entry['namespace']})
254 259 log_entry = Log()
255 log_entry.set_data(entry, resource=application)
260 log_entry.set_data(entry, resource=resource)
256 261 log_entry._skip_ft_index = True
257 application.logs.append(log_entry)
262 resource.logs.append(log_entry)
258 263 DBSession.flush()
259 264 # insert non pk rows first
260 265 if entry['primary_key'] is None:
261 266 es_docs[log_entry.partition_id].append(log_entry.es_doc())
262 267
263 268 # 2nd pass to delete all log entries from db foe same pk/ns pair
264 269 if ns_pairs:
265 270 ids_to_delete = []
266 271 es_docs = collections.defaultdict(list)
267 272 es_docs_to_delete = collections.defaultdict(list)
268 273 found_pkey_logs = LogService.query_by_primary_key_and_namespace(
269 274 list_of_pairs=ns_pairs)
270 275 log_dict = {}
271 276 for log_entry in found_pkey_logs:
272 277 log_key = (log_entry.primary_key, log_entry.namespace)
273 278 if log_key not in log_dict:
274 279 log_dict[log_key] = []
275 280 log_dict[log_key].append(log_entry)
276 281
277 282 for ns, entry_list in log_dict.items():
278 283 entry_list = sorted(entry_list, key=lambda x: x.timestamp)
279 284 # newest row needs to be indexed in es
280 285 log_entry = entry_list[-1]
281 286 # delete everything from pg and ES, leave the last row in pg
282 287 for e in entry_list[:-1]:
283 288 ids_to_delete.append(e.log_id)
284 289 es_docs_to_delete[e.partition_id].append(e.delete_hash)
285 290
286 291 es_docs_to_delete[log_entry.partition_id].append(
287 292 log_entry.delete_hash)
288 293
289 294 es_docs[log_entry.partition_id].append(log_entry.es_doc())
290 295
291 296 if ids_to_delete:
292 297 query = DBSession.query(Log).filter(
293 298 Log.log_id.in_(ids_to_delete))
294 299 query.delete(synchronize_session=False)
295 300 if es_docs_to_delete:
296 301 # batch this to avoid problems with default ES bulk limits
297 302 for es_index in es_docs_to_delete.keys():
298 303 for batch in in_batches(es_docs_to_delete[es_index], 20):
299 304 query = {'terms': {'delete_hash': batch}}
300 305
301 306 try:
302 307 Datastores.es.delete_by_query(
303 308 es_index, 'log', query)
304 309 except pyelasticsearch.ElasticHttpNotFoundError as exc:
305 310 msg = 'skipping index {}'.format(es_index)
306 311 log.info(msg)
307 312
308 313 total_logs = len(dataset)
309 314
310 315 log_msg = 'LOG_NEW: %s, entries: %s, proto:%s' % (
311 str(application),
316 str(resource),
312 317 total_logs,
313 318 proto_version)
314 319 log.info(log_msg)
315 320 # mark_changed(session)
316 321 redis_pipeline = Datastores.redis.pipeline(transaction=False)
317 322 key = REDIS_KEYS['counters']['logs_per_minute'].format(current_time)
318 323 redis_pipeline.incr(key, total_logs)
319 324 redis_pipeline.expire(key, 3600 * 24)
325 key = REDIS_KEYS['counters']['events_per_minute_per_user'].format(
326 resource.owner_user_id, current_time)
327 redis_pipeline.incr(key, total_logs)
328 redis_pipeline.expire(key, 3600)
320 329 key = REDIS_KEYS['counters']['logs_per_hour_per_app'].format(
321 330 resource_id, current_time.replace(minute=0))
322 331 redis_pipeline.incr(key, total_logs)
323 332 redis_pipeline.expire(key, 3600 * 24 * 7)
324 333 redis_pipeline.sadd(
325 334 REDIS_KEYS['apps_that_got_new_data_per_hour'].format(
326 335 current_time.replace(minute=0)), resource_id)
327 336 redis_pipeline.execute()
328 337 add_logs_es(es_docs)
329 338 return True
330 339 except Exception as exc:
331 340 print_traceback(log)
332 341 add_logs.retry(exc=exc)
333 342
334 343
335 344 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
336 345 def add_logs_es(es_docs):
337 346 for k, v in es_docs.items():
338 347 Datastores.es.bulk_index(k, 'log', v)
339 348
340 349
341 350 @celery.task(queue="metrics", default_retry_delay=600, max_retries=144)
342 351 def add_metrics(resource_id, request_params, dataset, proto_version):
343 352 current_time = datetime.utcnow().replace(second=0, microsecond=0)
344 353 try:
345 application = ApplicationService.by_id_cached()(resource_id)
346 application = DBSession.merge(application, load=False)
354 resource = ApplicationService.by_id_cached()(resource_id)
355 resource = DBSession.merge(resource, load=False)
347 356 es_docs = []
348 357 rows = []
349 358 for metric in dataset:
350 359 tags = dict(metric['tags'])
351 360 server_n = tags.get('server_name', metric['server_name']).lower()
352 361 tags['server_name'] = server_n or 'unknown'
353 362 new_metric = Metric(
354 363 timestamp=metric['timestamp'],
355 resource_id=application.resource_id,
364 resource_id=resource.resource_id,
356 365 namespace=metric['namespace'],
357 366 tags=tags)
358 367 rows.append(new_metric)
359 368 es_docs.append(new_metric.es_doc())
360 369 session = DBSession()
361 370 session.bulk_save_objects(rows)
362 371 session.flush()
363 372
364 373 action = 'METRICS'
365 374 metrics_msg = '%s: %s, metrics: %s, proto:%s' % (
366 375 action,
367 str(application),
376 str(resource),
368 377 len(dataset),
369 378 proto_version
370 379 )
371 380 log.info(metrics_msg)
372 381
373 382 mark_changed(session)
374 383 redis_pipeline = Datastores.redis.pipeline(transaction=False)
375 384 key = REDIS_KEYS['counters']['metrics_per_minute'].format(current_time)
376 385 redis_pipeline.incr(key, len(rows))
377 386 redis_pipeline.expire(key, 3600 * 24)
387 key = REDIS_KEYS['counters']['events_per_minute_per_user'].format(
388 resource.owner_user_id, current_time)
389 redis_pipeline.incr(key, len(rows))
390 redis_pipeline.expire(key, 3600)
378 391 key = REDIS_KEYS['counters']['metrics_per_hour_per_app'].format(
379 392 resource_id, current_time.replace(minute=0))
380 393 redis_pipeline.incr(key, len(rows))
381 394 redis_pipeline.expire(key, 3600 * 24 * 7)
382 395 redis_pipeline.sadd(
383 396 REDIS_KEYS['apps_that_got_new_data_per_hour'].format(
384 397 current_time.replace(minute=0)), resource_id)
385 398 redis_pipeline.execute()
386 399 add_metrics_es(es_docs)
387 400 return True
388 401 except Exception as exc:
389 402 print_traceback(log)
390 403 add_metrics.retry(exc=exc)
391 404
392 405
393 406 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
394 407 def add_metrics_es(es_docs):
395 408 for doc in es_docs:
396 409 partition = 'rcae_m_%s' % doc['timestamp'].strftime('%Y_%m_%d')
397 410 Datastores.es.index(partition, 'log', doc)
398 411
399 412
400 413 @celery.task(queue="default", default_retry_delay=5, max_retries=2)
401 414 def check_user_report_notifications(resource_id):
402 415 since_when = datetime.utcnow()
403 416 try:
404 417 request = get_current_request()
405 418 application = ApplicationService.by_id(resource_id)
406 419 if not application:
407 420 return
408 421 error_key = REDIS_KEYS['reports_to_notify_per_type_per_app'].format(
409 422 ReportType.error, resource_id)
410 423 slow_key = REDIS_KEYS['reports_to_notify_per_type_per_app'].format(
411 424 ReportType.slow, resource_id)
412 425 error_group_ids = Datastores.redis.smembers(error_key)
413 426 slow_group_ids = Datastores.redis.smembers(slow_key)
414 427 Datastores.redis.delete(error_key)
415 428 Datastores.redis.delete(slow_key)
416 429 err_gids = [int(g_id) for g_id in error_group_ids]
417 430 slow_gids = [int(g_id) for g_id in list(slow_group_ids)]
418 431 group_ids = err_gids + slow_gids
419 432 occurence_dict = {}
420 433 for g_id in group_ids:
421 434 key = REDIS_KEYS['counters']['report_group_occurences'].format(
422 435 g_id)
423 436 val = Datastores.redis.get(key)
424 437 Datastores.redis.delete(key)
425 438 if val:
426 439 occurence_dict[g_id] = int(val)
427 440 else:
428 441 occurence_dict[g_id] = 1
429 442 report_groups = ReportGroupService.by_ids(group_ids)
430 443 report_groups.options(sa.orm.joinedload(ReportGroup.last_report_ref))
431 444
432 445 ApplicationService.check_for_groups_alert(
433 446 application, 'alert', report_groups=report_groups,
434 447 occurence_dict=occurence_dict)
435 448 users = set([p.user for p in application.users_for_perm('view')])
436 449 report_groups = report_groups.all()
437 450 for user in users:
438 451 UserService.report_notify(user, request, application,
439 452 report_groups=report_groups,
440 453 occurence_dict=occurence_dict)
441 454 for group in report_groups:
442 455 # marks report_groups as notified
443 456 if not group.notified:
444 457 group.notified = True
445 458 except Exception as exc:
446 459 print_traceback(log)
447 460 raise
448 461
449 462
450 463 @celery.task(queue="default", default_retry_delay=5, max_retries=2)
451 464 def check_alerts(resource_id):
452 465 since_when = datetime.utcnow()
453 466 try:
454 467 request = get_current_request()
455 468 application = ApplicationService.by_id(resource_id)
456 469 if not application:
457 470 return
458 471 error_key = REDIS_KEYS[
459 472 'reports_to_notify_per_type_per_app_alerting'].format(
460 473 ReportType.error, resource_id)
461 474 slow_key = REDIS_KEYS[
462 475 'reports_to_notify_per_type_per_app_alerting'].format(
463 476 ReportType.slow, resource_id)
464 477 error_group_ids = Datastores.redis.smembers(error_key)
465 478 slow_group_ids = Datastores.redis.smembers(slow_key)
466 479 Datastores.redis.delete(error_key)
467 480 Datastores.redis.delete(slow_key)
468 481 err_gids = [int(g_id) for g_id in error_group_ids]
469 482 slow_gids = [int(g_id) for g_id in list(slow_group_ids)]
470 483 group_ids = err_gids + slow_gids
471 484 occurence_dict = {}
472 485 for g_id in group_ids:
473 486 key = REDIS_KEYS['counters'][
474 487 'report_group_occurences_alerting'].format(
475 488 g_id)
476 489 val = Datastores.redis.get(key)
477 490 Datastores.redis.delete(key)
478 491 if val:
479 492 occurence_dict[g_id] = int(val)
480 493 else:
481 494 occurence_dict[g_id] = 1
482 495 report_groups = ReportGroupService.by_ids(group_ids)
483 496 report_groups.options(sa.orm.joinedload(ReportGroup.last_report_ref))
484 497
485 498 ApplicationService.check_for_groups_alert(
486 499 application, 'alert', report_groups=report_groups,
487 500 occurence_dict=occurence_dict, since_when=since_when)
488 501 except Exception as exc:
489 502 print_traceback(log)
490 503 raise
491 504
492 505
493 506 @celery.task(queue="default", default_retry_delay=1, max_retries=2)
494 507 def close_alerts():
495 508 log.warning('Checking alerts')
496 509 since_when = datetime.utcnow()
497 510 try:
498 511 event_types = [Event.types['error_report_alert'],
499 512 Event.types['slow_report_alert'], ]
500 513 statuses = [Event.statuses['active']]
501 514 # get events older than 5 min
502 515 events = EventService.by_type_and_status(
503 516 event_types,
504 517 statuses,
505 518 older_than=(since_when - timedelta(minutes=5)))
506 519 for event in events:
507 520 # see if we can close them
508 521 event.validate_or_close(
509 522 since_when=(since_when - timedelta(minutes=1)))
510 523 except Exception as exc:
511 524 print_traceback(log)
512 525 raise
513 526
514 527
515 528 @celery.task(queue="default", default_retry_delay=600, max_retries=144)
516 529 def update_tag_counter(tag_name, tag_value, count):
517 530 try:
518 531 query = DBSession.query(Tag).filter(Tag.name == tag_name).filter(
519 532 sa.cast(Tag.value, sa.types.TEXT) == sa.cast(json.dumps(tag_value),
520 533 sa.types.TEXT))
521 534 query.update({'times_seen': Tag.times_seen + count,
522 535 'last_timestamp': datetime.utcnow()},
523 536 synchronize_session=False)
524 537 session = DBSession()
525 538 mark_changed(session)
526 539 return True
527 540 except Exception as exc:
528 541 print_traceback(log)
529 542 update_tag_counter.retry(exc=exc)
530 543
531 544
532 545 @celery.task(queue="default")
533 546 def update_tag_counters():
534 547 """
535 548 Sets task to update counters for application tags
536 549 """
537 550 tags = Datastores.redis.lrange(REDIS_KEYS['seen_tag_list'], 0, -1)
538 551 Datastores.redis.delete(REDIS_KEYS['seen_tag_list'])
539 552 c = collections.Counter(tags)
540 553 for t_json, count in c.items():
541 554 tag_info = json.loads(t_json)
542 555 update_tag_counter.delay(tag_info[0], tag_info[1], count)
543 556
544 557
545 558 @celery.task(queue="default")
546 559 def daily_digest():
547 560 """
548 561 Sends daily digest with top 50 error reports
549 562 """
550 563 request = get_current_request()
551 564 apps = Datastores.redis.smembers(REDIS_KEYS['apps_that_had_reports'])
552 565 Datastores.redis.delete(REDIS_KEYS['apps_that_had_reports'])
553 566 since_when = datetime.utcnow() - timedelta(hours=8)
554 567 log.warning('Generating daily digests')
555 568 for resource_id in apps:
556 569 resource_id = resource_id.decode('utf8')
557 570 end_date = datetime.utcnow().replace(microsecond=0, second=0)
558 571 filter_settings = {'resource': [resource_id],
559 572 'tags': [{'name': 'type',
560 573 'value': ['error'], 'op': None}],
561 574 'type': 'error', 'start_date': since_when,
562 575 'end_date': end_date}
563 576
564 577 reports = ReportGroupService.get_trending(
565 578 request, filter_settings=filter_settings, limit=50)
566 579
567 580 application = ApplicationService.by_id(resource_id)
568 581 if application:
569 582 users = set([p.user for p in application.users_for_perm('view')])
570 583 for user in users:
571 584 user.send_digest(request, application, reports=reports,
572 585 since_when=since_when)
573 586
574 587
575 588 @celery.task(queue="default")
576 589 def notifications_reports():
577 590 """
578 591 Loop that checks redis for info and then issues new tasks to celery to
579 592 issue notifications
580 593 """
581 594 apps = Datastores.redis.smembers(REDIS_KEYS['apps_that_had_reports'])
582 595 Datastores.redis.delete(REDIS_KEYS['apps_that_had_reports'])
583 596 for app in apps:
584 597 log.warning('Notify for app: %s' % app)
585 598 check_user_report_notifications.delay(app.decode('utf8'))
586 599
587 600 @celery.task(queue="default")
588 601 def alerting_reports():
589 602 """
590 603 Loop that checks redis for info and then issues new tasks to celery to
591 604 perform the following:
592 605 - which applications should have new alerts opened
593 606 """
594 607
595 608 apps = Datastores.redis.smembers(REDIS_KEYS['apps_that_had_reports_alerting'])
596 609 Datastores.redis.delete(REDIS_KEYS['apps_that_had_reports_alerting'])
597 610 for app in apps:
598 611 log.warning('Notify for app: %s' % app)
599 612 check_alerts.delay(app.decode('utf8'))
600 613
601 614
602 615 @celery.task(queue="default", soft_time_limit=3600 * 4,
603 616 hard_time_limit=3600 * 4, max_retries=144)
604 617 def logs_cleanup(resource_id, filter_settings):
605 618 request = get_current_request()
606 619 request.tm.begin()
607 620 es_query = {
608 621 "_source": False,
609 622 "size": 5000,
610 623 "query": {
611 624 "filtered": {
612 625 "filter": {
613 626 "and": [{"term": {"resource_id": resource_id}}]
614 627 }
615 628 }
616 629 }
617 630 }
618 631
619 632 query = DBSession.query(Log).filter(Log.resource_id == resource_id)
620 633 if filter_settings['namespace']:
621 634 query = query.filter(Log.namespace == filter_settings['namespace'][0])
622 635 es_query['query']['filtered']['filter']['and'].append(
623 636 {"term": {"namespace": filter_settings['namespace'][0]}}
624 637 )
625 638 query.delete(synchronize_session=False)
626 639 request.tm.commit()
627 640 result = request.es_conn.search(es_query, index='rcae_l_*',
628 641 doc_type='log', es_scroll='1m',
629 642 es_search_type='scan')
630 643 scroll_id = result['_scroll_id']
631 644 while True:
632 645 log.warning('log_cleanup, app:{} ns:{} batch'.format(
633 646 resource_id,
634 647 filter_settings['namespace']
635 648 ))
636 649 es_docs_to_delete = []
637 650 result = request.es_conn.send_request(
638 651 'POST', ['_search', 'scroll'],
639 652 body=scroll_id, query_params={"scroll": '1m'})
640 653 scroll_id = result['_scroll_id']
641 654 if not result['hits']['hits']:
642 655 break
643 656 for doc in result['hits']['hits']:
644 657 es_docs_to_delete.append({"id": doc['_id'],
645 658 "index": doc['_index']})
646 659
647 660 for batch in in_batches(es_docs_to_delete, 10):
648 661 Datastores.es.bulk([Datastores.es.delete_op(doc_type='log',
649 662 **to_del)
650 663 for to_del in batch])
@@ -1,68 +1,70 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2016 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # AppEnlight Enterprise Edition, including its added features, Support
19 19 # services, and proprietary license terms, please see
20 20 # https://rhodecode.com/licenses/
21 21
22 22 BASE = 'appenlight:data:{}'
23 23
24 24 REDIS_KEYS = {
25 25 'tasks': {
26 26 'add_reports_lock': BASE.format('add_reports_lock:{}'),
27 27 'add_logs_lock': BASE.format('add_logs_lock:{}'),
28 28 },
29 29 'counters': {
30 'events_per_minute_per_user': BASE.format(
31 'events_per_minute_per_user:{}:{}'),
30 32 'reports_per_minute': BASE.format('reports_per_minute:{}'),
31 33 'reports_per_hour_per_app': BASE.format(
32 34 'reports_per_hour_per_app:{}:{}'),
33 35 'reports_per_type': BASE.format('reports_per_type:{}'),
34 36 'logs_per_minute': BASE.format('logs_per_minute:{}'),
35 37 'logs_per_hour_per_app': BASE.format(
36 38 'logs_per_hour_per_app:{}:{}'),
37 39 'metrics_per_minute': BASE.format('metrics_per_minute:{}'),
38 40 'metrics_per_hour_per_app': BASE.format(
39 41 'metrics_per_hour_per_app:{}:{}'),
40 42 'report_group_occurences': BASE.format('report_group_occurences:{}'),
41 43 'report_group_occurences_alerting': BASE.format(
42 44 'report_group_occurences_alerting:{}'),
43 45 'report_group_occurences_10th': BASE.format(
44 46 'report_group_occurences_10th:{}'),
45 47 'report_group_occurences_100th': BASE.format(
46 48 'report_group_occurences_100th:{}'),
47 49 },
48 50 'rate_limits': {
49 51 'per_application_reports_rate_limit': BASE.format(
50 52 'per_application_reports_limit:{}:{}'),
51 53 'per_application_logs_rate_limit': BASE.format(
52 54 'per_application_logs_rate_limit:{}:{}'),
53 55 'per_application_metrics_rate_limit': BASE.format(
54 56 'per_application_metrics_rate_limit:{}:{}'),
55 57 },
56 58 'apps_that_got_new_data_per_hour': BASE.format('apps_that_got_new_data_per_hour:{}'),
57 59 'apps_that_had_reports': BASE.format('apps_that_had_reports'),
58 60 'apps_that_had_error_reports': BASE.format('apps_that_had_error_reports'),
59 61 'apps_that_had_reports_alerting': BASE.format(
60 62 'apps_that_had_reports_alerting'),
61 63 'apps_that_had_error_reports_alerting': BASE.format(
62 64 'apps_that_had_error_reports_alerting'),
63 65 'reports_to_notify_per_type_per_app': BASE.format(
64 66 'reports_to_notify_per_type_per_app:{}:{}'),
65 67 'reports_to_notify_per_type_per_app_alerting': BASE.format(
66 68 'reports_to_notify_per_type_per_app_alerting:{}:{}'),
67 69 'seen_tag_list': BASE.format('seen_tag_list')
68 70 }
General Comments 0
You need to be logged in to leave comments. Login now