##// END OF EJS Templates
celery: decouple report notifications from alerts
ergo -
Show More
@@ -1,161 +1,176 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2016 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # App Enlight Enterprise Edition, including its added features, Support
19 19 # services, and proprietary license terms, please see
20 20 # https://rhodecode.com/licenses/
21 21
22 22 import logging
23 23
24 24 from datetime import timedelta
25 25 from celery import Celery
26 26 from celery.bin import Option
27 27 from celery.schedules import crontab
28 28 from celery.signals import worker_init, task_revoked, user_preload_options
29 29 from celery.signals import task_prerun, task_retry, task_failure, task_success
30 30 from kombu.serialization import register
31 31 from pyramid.paster import bootstrap
32 32 from pyramid.request import Request
33 33 from pyramid.scripting import prepare
34 34 from pyramid.settings import asbool
35 35 from pyramid.threadlocal import get_current_request
36 36
37 37 from appenlight.celery.encoders import json_dumps, json_loads
38 38 from appenlight_client.ext.celery import register_signals
39 39
40 40 log = logging.getLogger(__name__)
41 41
42 42 register('date_json', json_dumps, json_loads,
43 43 content_type='application/x-date_json',
44 44 content_encoding='utf-8')
45 45
46 46 celery = Celery()
47 47
48 48 celery.user_options['preload'].add(
49 49 Option('--ini', dest='ini', default=None,
50 50 help='Specifies pyramid configuration file location.')
51 51 )
52 52
53
53 54 @user_preload_options.connect
54 55 def on_preload_parsed(options, **kwargs):
55 56 """
56 57 This actually configures celery from pyramid config file
57 58 """
58 59 celery.conf['INI_PYRAMID'] = options['ini']
59 60 import appenlight_client.client as e_client
60 61 ini_location = options['ini']
61 62 if not ini_location:
62 63 raise Exception('You need to pass pyramid ini location using '
63 64 '--ini=filename.ini argument to the worker')
64 65 env = bootstrap(ini_location)
65 66 api_key = env['request'].registry.settings['appenlight.api_key']
66 67 tr_config = env['request'].registry.settings.get(
67 68 'appenlight.transport_config')
68 69 CONFIG = e_client.get_config({'appenlight.api_key': api_key})
69 70 if tr_config:
70 71 CONFIG['appenlight.transport_config'] = tr_config
71 72 APPENLIGHT_CLIENT = e_client.Client(CONFIG)
72 73 # log.addHandler(APPENLIGHT_CLIENT.log_handler)
73 74 register_signals(APPENLIGHT_CLIENT)
74 75 celery.pyramid = env
75 76
76 77
77 78 celery_config = {
78 79 'CELERY_IMPORTS': ["appenlight.celery.tasks",],
79 80 'CELERYD_TASK_TIME_LIMIT': 60,
80 81 'CELERYD_MAX_TASKS_PER_CHILD': 1000,
81 82 'CELERY_IGNORE_RESULT': True,
82 83 'CELERY_ACCEPT_CONTENT': ['date_json'],
83 84 'CELERY_TASK_SERIALIZER': 'date_json',
84 85 'CELERY_RESULT_SERIALIZER': 'date_json',
85 86 'BROKER_URL': None,
86 87 'CELERYD_CONCURRENCY': None,
87 88 'CELERY_TIMEZONE': None,
88 89 'CELERYBEAT_SCHEDULE': {
89 'alerting': {
90 'task': 'appenlight.celery.tasks.alerting',
90 'alerting_reports': {
91 'task': 'appenlight.celery.tasks.alerting_reports',
91 92 'schedule': timedelta(seconds=60)
92 93 },
93 'daily_digest': {
94 'task': 'appenlight.celery.tasks.daily_digest',
95 'schedule': crontab(minute=1, hour='4,12,20')
96 },
94 'close_alerts': {
95 'task': 'appenlight.celery.tasks.close_alerts',
96 'schedule': timedelta(seconds=60)
97 }
97 98 }
98 99 }
99 100 celery.config_from_object(celery_config)
100 101
102
101 103 def configure_celery(pyramid_registry):
102 104 settings = pyramid_registry.settings
103 105 celery_config['BROKER_URL'] = settings['celery.broker_url']
104 106 celery_config['CELERYD_CONCURRENCY'] = settings['celery.concurrency']
105 107 celery_config['CELERY_TIMEZONE'] = settings['celery.timezone']
108
109 notifications_seconds = int(settings.get('tasks.notifications_reports.interval', 60))
110
111 celery_config['CELERYBEAT_SCHEDULE']['notifications'] = {
112 'task': 'appenlight.celery.tasks.notifications_reports',
113 'schedule': timedelta(seconds=notifications_seconds)
114 }
115
116 celery_config['CELERYBEAT_SCHEDULE']['daily_digest'] = {
117 'task': 'appenlight.celery.tasks.daily_digest',
118 'schedule': crontab(minute=1, hour='4,12,20')
119 }
120
106 121 if asbool(settings.get('celery.always_eager')):
107 122 celery_config['CELERY_ALWAYS_EAGER'] = True
108 123 celery_config['CELERY_EAGER_PROPAGATES_EXCEPTIONS'] = True
109 124
110 125 for plugin in pyramid_registry.appenlight_plugins.values():
111 126 if plugin.get('celery_tasks'):
112 127 celery_config['CELERY_IMPORTS'].extend(plugin['celery_tasks'])
113 128 if plugin.get('celery_beats'):
114 129 for name, config in plugin['celery_beats']:
115 130 celery_config['CELERYBEAT_SCHEDULE'][name] = config
116 131 celery.config_from_object(celery_config)
117 132
118 133
119 134 @task_prerun.connect
120 135 def task_prerun_signal(task_id, task, args, kwargs, **kwaargs):
121 136 if hasattr(celery, 'pyramid'):
122 137 env = celery.pyramid
123 138 env = prepare(registry=env['request'].registry)
124 139 proper_base_url = env['request'].registry.settings['mailing.app_url']
125 tmp_request = Request.blank('/', base_url=proper_base_url)
140 tmp_req = Request.blank('/', base_url=proper_base_url)
126 141 # ensure tasks generate url for right domain from config
127 env['request'].environ['HTTP_HOST'] = tmp_request.environ['HTTP_HOST']
128 env['request'].environ['SERVER_PORT'] = tmp_request.environ['SERVER_PORT']
129 env['request'].environ['SERVER_NAME'] = tmp_request.environ['SERVER_NAME']
130 env['request'].environ['wsgi.url_scheme'] = tmp_request.environ[
131 'wsgi.url_scheme']
142 env['request'].environ['HTTP_HOST'] = tmp_req.environ['HTTP_HOST']
143 env['request'].environ['SERVER_PORT'] = tmp_req.environ['SERVER_PORT']
144 env['request'].environ['SERVER_NAME'] = tmp_req.environ['SERVER_NAME']
145 env['request'].environ['wsgi.url_scheme'] = \
146 tmp_req.environ['wsgi.url_scheme']
132 147 get_current_request().tm.begin()
133 148
134 149
135 150 @task_success.connect
136 151 def task_success_signal(result, **kwargs):
137 152 get_current_request().tm.commit()
138 153 if hasattr(celery, 'pyramid'):
139 154 celery.pyramid["closer"]()
140 155
141 156
142 157 @task_retry.connect
143 158 def task_retry_signal(request, reason, einfo, **kwargs):
144 159 get_current_request().tm.abort()
145 160 if hasattr(celery, 'pyramid'):
146 161 celery.pyramid["closer"]()
147 162
148 163
149 164 @task_failure.connect
150 165 def task_failure_signal(task_id, exception, args, kwargs, traceback, einfo,
151 166 **kwaargs):
152 167 get_current_request().tm.abort()
153 168 if hasattr(celery, 'pyramid'):
154 169 celery.pyramid["closer"]()
155 170
156 171
157 172 @task_revoked.connect
158 173 def task_revoked_signal(request, terminated, signum, expired, **kwaargs):
159 174 get_current_request().tm.abort()
160 175 if hasattr(celery, 'pyramid'):
161 176 celery.pyramid["closer"]()
@@ -1,610 +1,662 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2016 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # App Enlight Enterprise Edition, including its added features, Support
19 19 # services, and proprietary license terms, please see
20 20 # https://rhodecode.com/licenses/
21 21
22 22 import bisect
23 23 import collections
24 24 import math
25 25 from datetime import datetime, timedelta
26 26
27 27 import sqlalchemy as sa
28 28 import pyelasticsearch
29 29
30 30 from celery.utils.log import get_task_logger
31 31 from zope.sqlalchemy import mark_changed
32 32 from pyramid.threadlocal import get_current_request, get_current_registry
33 33 from appenlight.celery import celery
34 34 from appenlight.models.report_group import ReportGroup
35 35 from appenlight.models import DBSession, Datastores
36 36 from appenlight.models.report import Report
37 37 from appenlight.models.log import Log
38 38 from appenlight.models.request_metric import Metric
39 39 from appenlight.models.event import Event
40 40
41 41 from appenlight.models.services.application import ApplicationService
42 42 from appenlight.models.services.event import EventService
43 43 from appenlight.models.services.log import LogService
44 44 from appenlight.models.services.report import ReportService
45 45 from appenlight.models.services.report_group import ReportGroupService
46 46 from appenlight.models.services.user import UserService
47 47 from appenlight.models.tag import Tag
48 48 from appenlight.lib import print_traceback
49 49 from appenlight.lib.utils import parse_proto, in_batches
50 50 from appenlight.lib.ext_json import json
51 51 from appenlight.lib.redis_keys import REDIS_KEYS
52 52 from appenlight.lib.enums import ReportType
53 53
54 54 log = get_task_logger(__name__)
55 55
56 56 sample_boundries = list(range(100, 10000, 100))
57 57
58 58
59 59 def pick_sample(total_occurences, report_type=1):
60 60 every = 1.0
61 61 position = bisect.bisect_left(sample_boundries, total_occurences)
62 62 if position > 0:
63 63 # 404
64 64 if report_type == 2:
65 65 divide = 10.0
66 66 else:
67 67 divide = 100.0
68 68 every = sample_boundries[position - 1] / divide
69 69 return total_occurences % every == 0
70 70
71 71
72 72 @celery.task(queue="default", default_retry_delay=1, max_retries=2)
73 73 def test_exception_task():
74 74 log.error('test celery log', extra={'location': 'celery'})
75 75 log.warning('test celery log', extra={'location': 'celery'})
76 76 raise Exception('Celery exception test')
77 77
78 78
79 79 @celery.task(queue="default", default_retry_delay=1, max_retries=2)
80 80 def test_retry_exception_task():
81 81 try:
82 82 import time
83 83
84 84 time.sleep(1.3)
85 85 log.error('test retry celery log', extra={'location': 'celery'})
86 86 log.warning('test retry celery log', extra={'location': 'celery'})
87 87 raise Exception('Celery exception test')
88 88 except Exception as exc:
89 89 test_retry_exception_task.retry(exc=exc)
90 90
91 91
92 92 @celery.task(queue="reports", default_retry_delay=600, max_retries=999)
93 93 def add_reports(resource_id, params, dataset, environ=None, **kwargs):
94 94 proto_version = parse_proto(params.get('protocol_version', ''))
95 95 current_time = datetime.utcnow().replace(second=0, microsecond=0)
96 96 try:
97 97 # we will store solr docs here for single insert
98 98 es_report_docs = {}
99 99 es_report_group_docs = {}
100 100 resource = ApplicationService.by_id(resource_id)
101 101 reports = []
102 102
103 103 if proto_version.major < 1 and proto_version.minor < 5:
104 104 for report_data in dataset:
105 105 report_details = report_data.get('report_details', [])
106 106 for i, detail_data in enumerate(report_details):
107 107 report_data.update(detail_data)
108 108 report_data.pop('report_details')
109 109 traceback = report_data.get('traceback')
110 110 if traceback is None:
111 111 report_data['traceback'] = report_data.get('frameinfo')
112 112 # for 0.3 api
113 113 error = report_data.pop('error_type', '')
114 114 if error:
115 115 report_data['error'] = error
116 116 if proto_version.minor < 4:
117 117 # convert "Unknown" slow reports to
118 118 # '' (from older clients)
119 119 if (report_data['error'] and
120 120 report_data['http_status'] < 500):
121 121 report_data['error'] = ''
122 122 message = report_data.get('message')
123 123 if 'extra' not in report_data:
124 124 report_data['extra'] = []
125 125 if message:
126 126 report_data['extra'] = [('message', message), ]
127 127 reports.append(report_data)
128 128 else:
129 129 reports = dataset
130 130
131 131 tags = []
132 132 es_slow_calls_docs = {}
133 133 es_reports_stats_rows = {}
134 134 for report_data in reports:
135 135 # build report details for later
136 136 added_details = 0
137 137 report = Report()
138 138 report.set_data(report_data, resource, proto_version)
139 139 report._skip_ft_index = True
140 140
141 141 report_group = ReportGroupService.by_hash_and_resource(
142 142 report.resource_id,
143 143 report.grouping_hash
144 144 )
145 145 occurences = report_data.get('occurences', 1)
146 146 if not report_group:
147 147 # total reports will be +1 moment later
148 148 report_group = ReportGroup(grouping_hash=report.grouping_hash,
149 149 occurences=0, total_reports=0,
150 150 last_report=0,
151 151 priority=report.priority,
152 152 error=report.error,
153 153 first_timestamp=report.start_time)
154 154 report_group._skip_ft_index = True
155 155 report_group.report_type = report.report_type
156 156 report.report_group_time = report_group.first_timestamp
157 157 add_sample = pick_sample(report_group.occurences,
158 158 report_type=report_group.report_type)
159 159 if add_sample:
160 160 resource.report_groups.append(report_group)
161 161 report_group.reports.append(report)
162 162 added_details += 1
163 163 DBSession.flush()
164 164 if report.partition_id not in es_report_docs:
165 165 es_report_docs[report.partition_id] = []
166 166 es_report_docs[report.partition_id].append(report.es_doc())
167 167 tags.extend(list(report.tags.items()))
168 168 slow_calls = report.add_slow_calls(report_data, report_group)
169 169 DBSession.flush()
170 170 for s_call in slow_calls:
171 171 if s_call.partition_id not in es_slow_calls_docs:
172 172 es_slow_calls_docs[s_call.partition_id] = []
173 173 es_slow_calls_docs[s_call.partition_id].append(
174 174 s_call.es_doc())
175 175 # try generating new stat rows if needed
176 176 else:
177 177 # required for postprocessing to not fail later
178 178 report.report_group = report_group
179 179
180 180 stat_row = ReportService.generate_stat_rows(
181 181 report, resource, report_group)
182 182 if stat_row.partition_id not in es_reports_stats_rows:
183 183 es_reports_stats_rows[stat_row.partition_id] = []
184 184 es_reports_stats_rows[stat_row.partition_id].append(
185 185 stat_row.es_doc())
186 186
187 187 # see if we should mark 10th occurence of report
188 188 last_occurences_10 = int(math.floor(report_group.occurences / 10))
189 189 curr_occurences_10 = int(math.floor(
190 190 (report_group.occurences + report.occurences) / 10))
191 191 last_occurences_100 = int(
192 192 math.floor(report_group.occurences / 100))
193 193 curr_occurences_100 = int(math.floor(
194 194 (report_group.occurences + report.occurences) / 100))
195 195 notify_occurences_10 = last_occurences_10 != curr_occurences_10
196 196 notify_occurences_100 = last_occurences_100 != curr_occurences_100
197 197 report_group.occurences = ReportGroup.occurences + occurences
198 198 report_group.last_timestamp = report.start_time
199 199 report_group.summed_duration = ReportGroup.summed_duration + report.duration
200 200 summed_duration = ReportGroup.summed_duration + report.duration
201 201 summed_occurences = ReportGroup.occurences + occurences
202 202 report_group.average_duration = summed_duration / summed_occurences
203 203 report_group.run_postprocessing(report)
204 204 if added_details:
205 205 report_group.total_reports = ReportGroup.total_reports + 1
206 206 report_group.last_report = report.id
207 207 report_group.set_notification_info(notify_10=notify_occurences_10,
208 208 notify_100=notify_occurences_100)
209 209 DBSession.flush()
210 210 report_group.get_report().notify_channel(report_group)
211 211 if report_group.partition_id not in es_report_group_docs:
212 212 es_report_group_docs[report_group.partition_id] = []
213 213 es_report_group_docs[report_group.partition_id].append(
214 214 report_group.es_doc())
215 215
216 216 action = 'REPORT'
217 217 log_msg = '%s: %s %s, client: %s, proto: %s' % (
218 218 action,
219 219 report_data.get('http_status', 'unknown'),
220 220 str(resource),
221 221 report_data.get('client'),
222 222 proto_version)
223 223 log.info(log_msg)
224 224 total_reports = len(dataset)
225 225 key = REDIS_KEYS['counters']['reports_per_minute'].format(current_time)
226 226 Datastores.redis.incr(key, total_reports)
227 227 Datastores.redis.expire(key, 3600 * 24)
228 228 key = REDIS_KEYS['counters']['reports_per_minute_per_app'].format(
229 229 resource_id, current_time)
230 230 Datastores.redis.incr(key, total_reports)
231 231 Datastores.redis.expire(key, 3600 * 24)
232 232
233 233 add_reports_es(es_report_group_docs, es_report_docs)
234 234 add_reports_slow_calls_es(es_slow_calls_docs)
235 235 add_reports_stats_rows_es(es_reports_stats_rows)
236 236 return True
237 237 except Exception as exc:
238 238 print_traceback(log)
239 239 add_reports.retry(exc=exc)
240 240
241 241
242 242 @celery.task(queue="es", default_retry_delay=600, max_retries=999)
243 243 def add_reports_es(report_group_docs, report_docs):
244 244 for k, v in report_group_docs.items():
245 245 Datastores.es.bulk_index(k, 'report_group', v, id_field="_id")
246 246 for k, v in report_docs.items():
247 247 Datastores.es.bulk_index(k, 'report', v, id_field="_id",
248 248 parent_field='_parent')
249 249
250 250
251 251 @celery.task(queue="es", default_retry_delay=600, max_retries=999)
252 252 def add_reports_slow_calls_es(es_docs):
253 253 for k, v in es_docs.items():
254 254 Datastores.es.bulk_index(k, 'log', v)
255 255
256 256
257 257 @celery.task(queue="es", default_retry_delay=600, max_retries=999)
258 258 def add_reports_stats_rows_es(es_docs):
259 259 for k, v in es_docs.items():
260 260 Datastores.es.bulk_index(k, 'log', v)
261 261
262 262
263 263 @celery.task(queue="logs", default_retry_delay=600, max_retries=999)
264 264 def add_logs(resource_id, request, dataset, environ=None, **kwargs):
265 265 proto_version = request.get('protocol_version')
266 266 current_time = datetime.utcnow().replace(second=0, microsecond=0)
267 267
268 268 try:
269 269 es_docs = collections.defaultdict(list)
270 270 application = ApplicationService.by_id(resource_id)
271 271 ns_pairs = []
272 272 for entry in dataset:
273 273 # gather pk and ns so we can remove older versions of row later
274 274 if entry['primary_key'] is not None:
275 275 ns_pairs.append({"pk": entry['primary_key'],
276 276 "ns": entry['namespace']})
277 277 log_entry = Log()
278 278 log_entry.set_data(entry, resource=application)
279 279 log_entry._skip_ft_index = True
280 280 application.logs.append(log_entry)
281 281 DBSession.flush()
282 282 # insert non pk rows first
283 283 if entry['primary_key'] is None:
284 284 es_docs[log_entry.partition_id].append(log_entry.es_doc())
285 285
286 286 # 2nd pass to delete all log entries from db foe same pk/ns pair
287 287 if ns_pairs:
288 288 ids_to_delete = []
289 289 es_docs = collections.defaultdict(list)
290 290 es_docs_to_delete = collections.defaultdict(list)
291 291 found_pkey_logs = LogService.query_by_primary_key_and_namespace(
292 292 list_of_pairs=ns_pairs)
293 293 log_dict = {}
294 294 for log_entry in found_pkey_logs:
295 295 log_key = (log_entry.primary_key, log_entry.namespace)
296 296 if log_key not in log_dict:
297 297 log_dict[log_key] = []
298 298 log_dict[log_key].append(log_entry)
299 299
300 300 for ns, entry_list in log_dict.items():
301 301 entry_list = sorted(entry_list, key=lambda x: x.timestamp)
302 302 # newest row needs to be indexed in es
303 303 log_entry = entry_list[-1]
304 304 # delete everything from pg and ES, leave the last row in pg
305 305 for e in entry_list[:-1]:
306 306 ids_to_delete.append(e.log_id)
307 307 es_docs_to_delete[e.partition_id].append(e.delete_hash)
308 308
309 309 es_docs_to_delete[log_entry.partition_id].append(
310 310 log_entry.delete_hash)
311 311
312 312 es_docs[log_entry.partition_id].append(log_entry.es_doc())
313 313
314 314 if ids_to_delete:
315 315 query = DBSession.query(Log).filter(
316 316 Log.log_id.in_(ids_to_delete))
317 317 query.delete(synchronize_session=False)
318 318 if es_docs_to_delete:
319 319 # batch this to avoid problems with default ES bulk limits
320 320 for es_index in es_docs_to_delete.keys():
321 321 for batch in in_batches(es_docs_to_delete[es_index], 20):
322 322 query = {'terms': {'delete_hash': batch}}
323 323
324 324 try:
325 325 Datastores.es.delete_by_query(
326 326 es_index, 'log', query)
327 327 except pyelasticsearch.ElasticHttpNotFoundError as exc:
328 328 log.error(exc)
329 329
330 330 total_logs = len(dataset)
331 331
332 332 log_msg = 'LOG_NEW: %s, entries: %s, proto:%s' % (
333 333 str(application),
334 334 total_logs,
335 335 proto_version)
336 336 log.info(log_msg)
337 337 # mark_changed(session)
338 338 key = REDIS_KEYS['counters']['logs_per_minute'].format(current_time)
339 339 Datastores.redis.incr(key, total_logs)
340 340 Datastores.redis.expire(key, 3600 * 24)
341 341 key = REDIS_KEYS['counters']['logs_per_minute_per_app'].format(
342 342 resource_id, current_time)
343 343 Datastores.redis.incr(key, total_logs)
344 344 Datastores.redis.expire(key, 3600 * 24)
345 345 add_logs_es(es_docs)
346 346 return True
347 347 except Exception as exc:
348 348 print_traceback(log)
349 349 add_logs.retry(exc=exc)
350 350
351 351
352 352 @celery.task(queue="es", default_retry_delay=600, max_retries=999)
353 353 def add_logs_es(es_docs):
354 354 for k, v in es_docs.items():
355 355 Datastores.es.bulk_index(k, 'log', v)
356 356
357 357
358 358 @celery.task(queue="metrics", default_retry_delay=600, max_retries=999)
359 359 def add_metrics(resource_id, request, dataset, proto_version):
360 360 current_time = datetime.utcnow().replace(second=0, microsecond=0)
361 361 try:
362 362 application = ApplicationService.by_id_cached()(resource_id)
363 363 application = DBSession.merge(application, load=False)
364 364 es_docs = []
365 365 rows = []
366 366 for metric in dataset:
367 367 tags = dict(metric['tags'])
368 368 server_n = tags.get('server_name', metric['server_name']).lower()
369 369 tags['server_name'] = server_n or 'unknown'
370 370 new_metric = Metric(
371 371 timestamp=metric['timestamp'],
372 372 resource_id=application.resource_id,
373 373 namespace=metric['namespace'],
374 374 tags=tags)
375 375 rows.append(new_metric)
376 376 es_docs.append(new_metric.es_doc())
377 377 session = DBSession()
378 378 session.bulk_save_objects(rows)
379 379 session.flush()
380 380
381 381 action = 'METRICS'
382 382 metrics_msg = '%s: %s, metrics: %s, proto:%s' % (
383 383 action,
384 384 str(application),
385 385 len(dataset),
386 386 proto_version
387 387 )
388 388 log.info(metrics_msg)
389 389
390 390 mark_changed(session)
391 391 key = REDIS_KEYS['counters']['metrics_per_minute'].format(current_time)
392 392 Datastores.redis.incr(key, len(rows))
393 393 Datastores.redis.expire(key, 3600 * 24)
394 394 key = REDIS_KEYS['counters']['metrics_per_minute_per_app'].format(
395 395 resource_id, current_time)
396 396 Datastores.redis.incr(key, len(rows))
397 397 Datastores.redis.expire(key, 3600 * 24)
398 398 add_metrics_es(es_docs)
399 399 return True
400 400 except Exception as exc:
401 401 print_traceback(log)
402 402 add_metrics.retry(exc=exc)
403 403
404 404
405 405 @celery.task(queue="es", default_retry_delay=600, max_retries=999)
406 406 def add_metrics_es(es_docs):
407 407 for doc in es_docs:
408 408 partition = 'rcae_m_%s' % doc['timestamp'].strftime('%Y_%m_%d')
409 409 Datastores.es.index(partition, 'log', doc)
410 410
411 411
412 412 @celery.task(queue="default", default_retry_delay=5, max_retries=2)
413 def check_user_report_notifications(resource_id, since_when=None):
413 def check_user_report_notifications(resource_id):
414 since_when = datetime.utcnow()
414 415 try:
415 416 request = get_current_request()
416 417 application = ApplicationService.by_id(resource_id)
417 418 if not application:
418 419 return
419 420 error_key = REDIS_KEYS['reports_to_notify_per_type_per_app'].format(
420 421 ReportType.error, resource_id)
421 422 slow_key = REDIS_KEYS['reports_to_notify_per_type_per_app'].format(
422 423 ReportType.slow, resource_id)
423 424 error_group_ids = Datastores.redis.smembers(error_key)
424 425 slow_group_ids = Datastores.redis.smembers(slow_key)
425 426 Datastores.redis.delete(error_key)
426 427 Datastores.redis.delete(slow_key)
427 428 err_gids = [int(g_id) for g_id in error_group_ids]
428 429 slow_gids = [int(g_id) for g_id in list(slow_group_ids)]
429 430 group_ids = err_gids + slow_gids
430 431 occurence_dict = {}
431 432 for g_id in group_ids:
432 433 key = REDIS_KEYS['counters']['report_group_occurences'].format(
433 434 g_id)
434 435 val = Datastores.redis.get(key)
435 436 Datastores.redis.delete(key)
436 437 if val:
437 438 occurence_dict[g_id] = int(val)
438 439 else:
439 440 occurence_dict[g_id] = 1
440 441 report_groups = ReportGroupService.by_ids(group_ids)
441 442 report_groups.options(sa.orm.joinedload(ReportGroup.last_report_ref))
442 443
443 444 ApplicationService.check_for_groups_alert(
444 445 application, 'alert', report_groups=report_groups,
445 occurence_dict=occurence_dict, since_when=since_when)
446 occurence_dict=occurence_dict)
446 447 users = set([p.user for p in application.users_for_perm('view')])
447 448 report_groups = report_groups.all()
448 449 for user in users:
449 450 UserService.report_notify(user, request, application,
450 451 report_groups=report_groups,
451 occurence_dict=occurence_dict,
452 since_when=since_when)
452 occurence_dict=occurence_dict)
453 453 for group in report_groups:
454 454 # marks report_groups as notified
455 455 if not group.notified:
456 456 group.notified = True
457 457 except Exception as exc:
458 458 print_traceback(log)
459 459 raise
460 460
461 461
462 @celery.task(queue="default", default_retry_delay=5, max_retries=2)
463 def check_alerts(resource_id):
464 since_when = datetime.utcnow()
465 try:
466 request = get_current_request()
467 application = ApplicationService.by_id(resource_id)
468 if not application:
469 return
470 error_key = REDIS_KEYS[
471 'reports_to_notify_per_type_per_app_alerting'].format(
472 ReportType.error, resource_id)
473 slow_key = REDIS_KEYS[
474 'reports_to_notify_per_type_per_app_alerting'].format(
475 ReportType.slow, resource_id)
476 error_group_ids = Datastores.redis.smembers(error_key)
477 slow_group_ids = Datastores.redis.smembers(slow_key)
478 Datastores.redis.delete(error_key)
479 Datastores.redis.delete(slow_key)
480 err_gids = [int(g_id) for g_id in error_group_ids]
481 slow_gids = [int(g_id) for g_id in list(slow_group_ids)]
482 group_ids = err_gids + slow_gids
483 occurence_dict = {}
484 for g_id in group_ids:
485 key = REDIS_KEYS['counters'][
486 'report_group_occurences_alerting'].format(
487 g_id)
488 val = Datastores.redis.get(key)
489 Datastores.redis.delete(key)
490 if val:
491 occurence_dict[g_id] = int(val)
492 else:
493 occurence_dict[g_id] = 1
494 report_groups = ReportGroupService.by_ids(group_ids)
495 report_groups.options(sa.orm.joinedload(ReportGroup.last_report_ref))
496
497 ApplicationService.check_for_groups_alert(
498 application, 'alert', report_groups=report_groups,
499 occurence_dict=occurence_dict, since_when=since_when)
500 except Exception as exc:
501 print_traceback(log)
502 raise
503
504
462 505 @celery.task(queue="default", default_retry_delay=1, max_retries=2)
463 def close_alerts(since_when=None):
506 def close_alerts():
464 507 log.warning('Checking alerts')
508 since_when = datetime.utcnow()
465 509 try:
466 510 event_types = [Event.types['error_report_alert'],
467 511 Event.types['slow_report_alert'], ]
468 512 statuses = [Event.statuses['active']]
469 513 # get events older than 5 min
470 514 events = EventService.by_type_and_status(
471 515 event_types,
472 516 statuses,
473 517 older_than=(since_when - timedelta(minutes=5)))
474 518 for event in events:
475 519 # see if we can close them
476 520 event.validate_or_close(
477 521 since_when=(since_when - timedelta(minutes=1)))
478 522 except Exception as exc:
479 523 print_traceback(log)
480 524 raise
481 525
482 526
483 527 @celery.task(queue="default", default_retry_delay=600, max_retries=999)
484 528 def update_tag_counter(tag_name, tag_value, count):
485 529 try:
486 530 query = DBSession.query(Tag).filter(Tag.name == tag_name).filter(
487 531 sa.cast(Tag.value, sa.types.TEXT) == sa.cast(json.dumps(tag_value),
488 532 sa.types.TEXT))
489 533 query.update({'times_seen': Tag.times_seen + count,
490 534 'last_timestamp': datetime.utcnow()},
491 535 synchronize_session=False)
492 536 session = DBSession()
493 537 mark_changed(session)
494 538 return True
495 539 except Exception as exc:
496 540 print_traceback(log)
497 541 update_tag_counter.retry(exc=exc)
498 542
499 543
500 544 @celery.task(queue="default")
501 545 def update_tag_counters():
502 546 """
503 547 Sets task to update counters for application tags
504 548 """
505 549 tags = Datastores.redis.lrange(REDIS_KEYS['seen_tag_list'], 0, -1)
506 550 Datastores.redis.delete(REDIS_KEYS['seen_tag_list'])
507 551 c = collections.Counter(tags)
508 552 for t_json, count in c.items():
509 553 tag_info = json.loads(t_json)
510 554 update_tag_counter.delay(tag_info[0], tag_info[1], count)
511 555
512 556
513 557 @celery.task(queue="default")
514 558 def daily_digest():
515 559 """
516 560 Sends daily digest with top 50 error reports
517 561 """
518 562 request = get_current_request()
519 563 apps = Datastores.redis.smembers(REDIS_KEYS['apps_that_had_reports'])
520 564 Datastores.redis.delete(REDIS_KEYS['apps_that_had_reports'])
521 565 since_when = datetime.utcnow() - timedelta(hours=8)
522 566 log.warning('Generating daily digests')
523 567 for resource_id in apps:
524 568 resource_id = resource_id.decode('utf8')
525 569 end_date = datetime.utcnow().replace(microsecond=0, second=0)
526 570 filter_settings = {'resource': [resource_id],
527 571 'tags': [{'name': 'type',
528 572 'value': ['error'], 'op': None}],
529 573 'type': 'error', 'start_date': since_when,
530 574 'end_date': end_date}
531 575
532 576 reports = ReportGroupService.get_trending(
533 577 request, filter_settings=filter_settings, limit=50)
534 578
535 579 application = ApplicationService.by_id(resource_id)
536 580 if application:
537 581 users = set([p.user for p in application.users_for_perm('view')])
538 582 for user in users:
539 583 user.send_digest(request, application, reports=reports,
540 584 since_when=since_when)
541 585
542 586
543 587 @celery.task(queue="default")
544 def alerting():
588 def notifications_reports():
545 589 """
546 590 Loop that checks redis for info and then issues new tasks to celery to
547 perform the following:
548 - which applications should have new alerts opened
549 - which currently opened alerts should be closed
591 issue notifications
550 592 """
551 start_time = datetime.utcnow()
552 # transactions are needed for mailer
553 593 apps = Datastores.redis.smembers(REDIS_KEYS['apps_that_had_reports'])
554 594 Datastores.redis.delete(REDIS_KEYS['apps_that_had_reports'])
555 595 for app in apps:
556 596 log.warning('Notify for app: %s' % app)
557 597 check_user_report_notifications.delay(app.decode('utf8'))
558 # clear app ids from set
559 close_alerts.delay(since_when=start_time)
598
599 @celery.task(queue="default")
600 def alerting_reports():
601 """
602 Loop that checks redis for info and then issues new tasks to celery to
603 perform the following:
604 - which applications should have new alerts opened
605 """
606
607 apps = Datastores.redis.smembers(REDIS_KEYS['apps_that_had_reports_alerting'])
608 Datastores.redis.delete(REDIS_KEYS['apps_that_had_reports_alerting'])
609 for app in apps:
610 log.warning('Notify for app: %s' % app)
611 check_alerts.delay(app.decode('utf8'))
560 612
561 613
562 @celery.task(queue="default", soft_time_limit=3600 * 4, hard_time_limit=3600 * 4,
563 max_retries=999)
614 @celery.task(queue="default", soft_time_limit=3600 * 4,
615 hard_time_limit=3600 * 4, max_retries=999)
564 616 def logs_cleanup(resource_id, filter_settings):
565 617 request = get_current_request()
566 618 request.tm.begin()
567 619 es_query = {
568 620 "_source": False,
569 621 "size": 5000,
570 622 "query": {
571 623 "filtered": {
572 624 "filter": {
573 625 "and": [{"term": {"resource_id": resource_id}}]
574 626 }
575 627 }
576 628 }
577 629 }
578 630
579 631 query = DBSession.query(Log).filter(Log.resource_id == resource_id)
580 632 if filter_settings['namespace']:
581 633 query = query.filter(Log.namespace == filter_settings['namespace'][0])
582 634 es_query['query']['filtered']['filter']['and'].append(
583 635 {"term": {"namespace": filter_settings['namespace'][0]}}
584 636 )
585 637 query.delete(synchronize_session=False)
586 638 request.tm.commit()
587 639 result = request.es_conn.search(es_query, index='rcae_l_*',
588 640 doc_type='log', es_scroll='1m',
589 641 es_search_type='scan')
590 642 scroll_id = result['_scroll_id']
591 643 while True:
592 644 log.warning('log_cleanup, app:{} ns:{} batch'.format(
593 645 resource_id,
594 646 filter_settings['namespace']
595 647 ))
596 648 es_docs_to_delete = []
597 649 result = request.es_conn.send_request(
598 650 'POST', ['_search', 'scroll'],
599 651 body=scroll_id, query_params={"scroll": '1m'})
600 652 scroll_id = result['_scroll_id']
601 653 if not result['hits']['hits']:
602 654 break
603 655 for doc in result['hits']['hits']:
604 656 es_docs_to_delete.append({"id": doc['_id'],
605 657 "index": doc['_index']})
606 658
607 659 for batch in in_batches(es_docs_to_delete, 10):
608 660 Datastores.es.bulk([Datastores.es.delete_op(doc_type='log',
609 661 **to_del)
610 662 for to_del in batch])
@@ -1,53 +1,55 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2016 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # App Enlight Enterprise Edition, including its added features, Support
19 19 # services, and proprietary license terms, please see
20 20 # https://rhodecode.com/licenses/
21 21
22 22 """Miscellaneous support packages for {{project}}.
23 23 """
24 24 import random
25 25 import string
26 26 import importlib
27 27
28 28 from appenlight_client.exceptions import get_current_traceback
29 29
30 30
31 31 def generate_random_string(chars=10):
32 32 return ''.join(random.sample(string.ascii_letters * 2 + string.digits,
33 33 chars))
34 34
35 35
36 36 def to_integer_safe(input):
37 37 try:
38 38 return int(input)
39 39 except (TypeError, ValueError,):
40 40 return None
41 41
42
42 43 def print_traceback(log):
43 44 traceback = get_current_traceback(skip=1, show_hidden_frames=True,
44 45 ignore_system_exceptions=True)
45 46 exception_text = traceback.exception
46 47 log.error(exception_text)
47 48 log.error(traceback.plaintext)
48 49 del traceback
49 50
51
50 52 def get_callable(import_string):
51 53 import_module, indexer_callable = import_string.split(':')
52 54 return getattr(importlib.import_module(import_module),
53 55 indexer_callable)
@@ -1,59 +1,67 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2016 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # App Enlight Enterprise Edition, including its added features, Support
19 19 # services, and proprietary license terms, please see
20 20 # https://rhodecode.com/licenses/
21 21
22 22 BASE = 'appenlight:data:{}'
23 23
24 24 REDIS_KEYS = {
25 25 'tasks': {
26 26 'add_reports_lock': BASE.format('add_reports_lock:{}'),
27 27 'add_logs_lock': BASE.format('add_logs_lock:{}'),
28 28 },
29 29 'counters': {
30 30 'reports_per_minute': BASE.format('reports_per_minute:{}'),
31 31 'reports_per_minute_per_app': BASE.format(
32 32 'reports_per_minute_per_app:{}:{}'),
33 33 'reports_per_type': BASE.format('reports_per_type:{}'),
34 34 'logs_per_minute': BASE.format('logs_per_minute:{}'),
35 35 'logs_per_minute_per_app': BASE.format(
36 36 'logs_per_minute_per_app:{}:{}'),
37 37 'metrics_per_minute': BASE.format('metrics_per_minute:{}'),
38 38 'metrics_per_minute_per_app': BASE.format(
39 39 'metrics_per_minute_per_app:{}:{}'),
40 40 'report_group_occurences': BASE.format('report_group_occurences:{}'),
41 'report_group_occurences_alerting': BASE.format(
42 'report_group_occurences_alerting:{}'),
41 43 'report_group_occurences_10th': BASE.format(
42 44 'report_group_occurences_10th:{}'),
43 45 'report_group_occurences_100th': BASE.format(
44 46 'report_group_occurences_100th:{}'),
45 47 },
46 48 'rate_limits': {
47 49 'per_application_reports_rate_limit': BASE.format(
48 50 'per_application_reports_limit:{}:{}'),
49 51 'per_application_logs_rate_limit': BASE.format(
50 52 'per_application_logs_rate_limit:{}:{}'),
51 53 'per_application_metrics_rate_limit': BASE.format(
52 54 'per_application_metrics_rate_limit:{}:{}'),
53 55 },
54 56 'apps_that_had_reports': BASE.format('apps_that_had_reports'),
55 57 'apps_that_had_error_reports': BASE.format('apps_that_had_error_reports'),
58 'apps_that_had_reports_alerting': BASE.format(
59 'apps_that_had_reports_alerting'),
60 'apps_that_had_error_reports_alerting': BASE.format(
61 'apps_that_had_error_reports_alerting'),
56 62 'reports_to_notify_per_type_per_app': BASE.format(
57 63 'reports_to_notify_per_type_per_app:{}:{}'),
64 'reports_to_notify_per_type_per_app_alerting': BASE.format(
65 'reports_to_notify_per_type_per_app_alerting:{}:{}'),
58 66 'seen_tag_list': BASE.format('seen_tag_list')
59 67 }
@@ -1,251 +1,264 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2016 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # App Enlight Enterprise Edition, including its added features, Support
19 19 # services, and proprietary license terms, please see
20 20 # https://rhodecode.com/licenses/
21 21
22 22 import logging
23 23 import sqlalchemy as sa
24 24
25 25 from datetime import datetime
26 26
27 27 from pyramid.threadlocal import get_current_request
28 28 from sqlalchemy.dialects.postgresql import JSON
29 29 from ziggurat_foundations.models.base import BaseModel
30 30
31 31 from appenlight.models import Base, get_db_session, Datastores
32 32 from appenlight.lib.enums import ReportType
33 33 from appenlight.lib.rule import Rule
34 34 from appenlight.lib.redis_keys import REDIS_KEYS
35 35 from appenlight.models.report import REPORT_TYPE_MATRIX
36 36
37 37 log = logging.getLogger(__name__)
38 38
39 39
40 40 class ReportGroup(Base, BaseModel):
41 41 __tablename__ = 'reports_groups'
42 42 __table_args__ = {'implicit_returning': False}
43 43
44 44 id = sa.Column(sa.BigInteger(), nullable=False, primary_key=True)
45 45 resource_id = sa.Column(sa.Integer(),
46 46 sa.ForeignKey('applications.resource_id',
47 47 onupdate='CASCADE',
48 48 ondelete='CASCADE'),
49 49 nullable=False,
50 50 index=True)
51 51 priority = sa.Column(sa.Integer, nullable=False, index=True, default=5,
52 52 server_default='5')
53 53 first_timestamp = sa.Column(sa.DateTime(), default=datetime.utcnow,
54 54 server_default=sa.func.now())
55 55 last_timestamp = sa.Column(sa.DateTime(), default=datetime.utcnow,
56 56 server_default=sa.func.now())
57 57 error = sa.Column(sa.UnicodeText(), index=True)
58 58 grouping_hash = sa.Column(sa.String(40), default='')
59 59 triggered_postprocesses_ids = sa.Column(JSON(), nullable=False,
60 60 default=list)
61 61 report_type = sa.Column(sa.Integer, default=1)
62 62 total_reports = sa.Column(sa.Integer, default=1)
63 63 last_report = sa.Column(sa.Integer)
64 64 occurences = sa.Column(sa.Integer, default=1)
65 65 average_duration = sa.Column(sa.Float, default=0)
66 66 summed_duration = sa.Column(sa.Float, default=0)
67 67 read = sa.Column(sa.Boolean(), index=True, default=False)
68 68 fixed = sa.Column(sa.Boolean(), index=True, default=False)
69 69 notified = sa.Column(sa.Boolean(), index=True, default=False)
70 70 public = sa.Column(sa.Boolean(), index=True, default=False)
71 71
72 72 reports = sa.orm.relationship('Report',
73 73 lazy='dynamic',
74 74 backref='report_group',
75 75 cascade="all, delete-orphan",
76 76 passive_deletes=True,
77 77 passive_updates=True, )
78 78
79 79 comments = sa.orm.relationship('ReportComment',
80 80 lazy='dynamic',
81 81 backref='report',
82 82 cascade="all, delete-orphan",
83 83 passive_deletes=True,
84 84 passive_updates=True,
85 85 order_by="ReportComment.comment_id")
86 86
87 87 assigned_users = sa.orm.relationship('User',
88 88 backref=sa.orm.backref(
89 89 'assigned_reports_relation',
90 90 lazy='dynamic',
91 91 order_by=sa.desc(
92 92 "reports_groups.id")
93 93 ),
94 94 passive_deletes=True,
95 95 passive_updates=True,
96 96 secondary='reports_assignments',
97 97 order_by="User.user_name")
98 98
99 99 stats = sa.orm.relationship('ReportStat',
100 100 lazy='dynamic',
101 101 backref='report',
102 102 passive_deletes=True,
103 103 passive_updates=True, )
104 104
105 105 last_report_ref = sa.orm.relationship('Report',
106 106 uselist=False,
107 107 primaryjoin="ReportGroup.last_report "
108 108 "== Report.id",
109 109 foreign_keys="Report.id",
110 110 cascade="all, delete-orphan",
111 111 passive_deletes=True,
112 112 passive_updates=True, )
113 113
114 114 def __repr__(self):
115 115 return '<ReportGroup id:{}>'.format(self.id)
116 116
117 117 def get_report(self, report_id=None, public=False):
118 118 """
119 119 Gets report with specific id or latest report if id was not specified
120 120 """
121 121 from .report import Report
122 122
123 123 if not report_id:
124 124 return self.last_report_ref
125 125 else:
126 126 return self.reports.filter(Report.id == report_id).first()
127 127
128 128 def get_public_url(self, request, _app_url=None):
129 129 url = request.route_url('/', _app_url=_app_url)
130 130 return (url + 'ui/report/%s') % self.id
131 131
132 132 def run_postprocessing(self, report):
133 133 """
134 134 Alters report group priority based on postprocessing configuration
135 135 """
136 136 request = get_current_request()
137 137 get_db_session(None, self).flush()
138 138 for action in self.application.postprocess_conf:
139 139 get_db_session(None, self).flush()
140 140 rule_obj = Rule(action.rule, REPORT_TYPE_MATRIX)
141 141 report_dict = report.get_dict(request)
142 142 # if was not processed yet
143 143 if (rule_obj.match(report_dict) and
144 144 action.pkey not in self.triggered_postprocesses_ids):
145 145 action.postprocess(self)
146 146 # this way sqla can track mutation of list
147 147 self.triggered_postprocesses_ids = \
148 148 self.triggered_postprocesses_ids + [action.pkey]
149 149
150 150 get_db_session(None, self).flush()
151 151 # do not go out of bounds
152 152 if self.priority < 1:
153 153 self.priority = 1
154 154 if self.priority > 10:
155 155 self.priority = 10
156 156
157 157 def get_dict(self, request):
158 158 instance_dict = super(ReportGroup, self).get_dict()
159 159 instance_dict['server_name'] = self.get_report().tags.get(
160 160 'server_name')
161 161 instance_dict['view_name'] = self.get_report().tags.get('view_name')
162 162 instance_dict['resource_name'] = self.application.resource_name
163 163 instance_dict['report_type'] = self.get_report().report_type
164 164 instance_dict['url_path'] = self.get_report().url_path
165 165 instance_dict['front_url'] = self.get_report().get_public_url(request)
166 166 del instance_dict['triggered_postprocesses_ids']
167 167 return instance_dict
168 168
169 169 def es_doc(self):
170 170 return {
171 171 '_id': str(self.id),
172 172 'pg_id': str(self.id),
173 173 'resource_id': self.resource_id,
174 174 'error': self.error,
175 175 'fixed': self.fixed,
176 176 'public': self.public,
177 177 'read': self.read,
178 178 'priority': self.priority,
179 179 'occurences': self.occurences,
180 180 'average_duration': self.average_duration,
181 181 'summed_duration': self.summed_duration,
182 182 'first_timestamp': self.first_timestamp,
183 183 'last_timestamp': self.last_timestamp
184 184 }
185 185
186 186 def set_notification_info(self, notify_10=False, notify_100=False):
187 187 """
188 188 Update redis notification maps for notification job
189 189 """
190 190 current_time = datetime.utcnow().replace(second=0, microsecond=0)
191 191 # global app counter
192 192 key = REDIS_KEYS['counters']['reports_per_type'].format(
193 193 self.report_type, current_time)
194 194 Datastores.redis.incr(key)
195 195 Datastores.redis.expire(key, 3600 * 24)
196 # detailed app notification
196 # detailed app notification for alerts and notifications
197 197 Datastores.redis.sadd(REDIS_KEYS['apps_that_had_reports'],
198 198 self.resource_id)
199 Datastores.redis.sadd(REDIS_KEYS['apps_that_had_reports_alerting'],
200 self.resource_id)
199 201 # only notify for exceptions here
200 202 if self.report_type == ReportType.error:
201 Datastores.redis.sadd(REDIS_KEYS['apps_that_had_reports'],
203 Datastores.redis.sadd(
204 REDIS_KEYS['apps_that_had_reports'],
205 self.resource_id)
206 Datastores.redis.sadd(
207 REDIS_KEYS['apps_that_had_error_reports_alerting'],
202 208 self.resource_id)
203 209 key = REDIS_KEYS['counters']['report_group_occurences'].format(self.id)
204 210 Datastores.redis.incr(key)
205 211 Datastores.redis.expire(key, 3600 * 24)
212 key = REDIS_KEYS['counters']['report_group_occurences_alerting'].format(self.id)
213 Datastores.redis.incr(key)
214 Datastores.redis.expire(key, 3600 * 24)
206 215
207 216 if notify_10:
208 217 key = REDIS_KEYS['counters'][
209 218 'report_group_occurences_10th'].format(self.id)
210 219 Datastores.redis.setex(key, 3600 * 24, 1)
211 220 if notify_100:
212 221 key = REDIS_KEYS['counters'][
213 222 'report_group_occurences_100th'].format(self.id)
214 223 Datastores.redis.setex(key, 3600 * 24, 1)
215 224
216 225 key = REDIS_KEYS['reports_to_notify_per_type_per_app'].format(
217 226 self.report_type, self.resource_id)
218 227 Datastores.redis.sadd(key, self.id)
219 228 Datastores.redis.expire(key, 3600 * 24)
229 key = REDIS_KEYS['reports_to_notify_per_type_per_app_alerting'].format(
230 self.report_type, self.resource_id)
231 Datastores.redis.sadd(key, self.id)
232 Datastores.redis.expire(key, 3600 * 24)
220 233
221 234 @property
222 235 def partition_id(self):
223 236 return 'rcae_r_%s' % self.first_timestamp.strftime('%Y_%m')
224 237
225 238
226 239 def after_insert(mapper, connection, target):
227 240 if not hasattr(target, '_skip_ft_index'):
228 241 data = target.es_doc()
229 242 data.pop('_id', None)
230 243 Datastores.es.index(target.partition_id, 'report_group',
231 244 data, id=target.id)
232 245
233 246
234 247 def after_update(mapper, connection, target):
235 248 if not hasattr(target, '_skip_ft_index'):
236 249 data = target.es_doc()
237 250 data.pop('_id', None)
238 251 Datastores.es.index(target.partition_id, 'report_group',
239 252 data, id=target.id)
240 253
241 254
242 255 def after_delete(mapper, connection, target):
243 256 query = {'term': {'group_id': target.id}}
244 257 Datastores.es.delete_by_query(target.partition_id, 'report', query)
245 258 query = {'term': {'pg_id': target.id}}
246 259 Datastores.es.delete_by_query(target.partition_id, 'report_group', query)
247 260
248 261
249 262 sa.event.listen(ReportGroup, 'after_insert', after_insert)
250 263 sa.event.listen(ReportGroup, 'after_update', after_update)
251 264 sa.event.listen(ReportGroup, 'after_delete', after_delete)
@@ -1,158 +1,156 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2016 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # App Enlight Enterprise Edition, including its added features, Support
19 19 # services, and proprietary license terms, please see
20 20 # https://rhodecode.com/licenses/
21 21
22 22 import logging
23 23 import pyramid_mailer
24 24 import pyramid.renderers
25 25 import sqlalchemy as sa
26 26
27 27 from collections import namedtuple
28 28 from datetime import datetime
29 29
30 30 from appenlight.lib.rule import Rule
31 31 from appenlight.models import get_db_session
32 32 from appenlight.models.integrations import IntegrationException
33 33 from appenlight.models.report import REPORT_TYPE_MATRIX
34 34 from appenlight.models.user import User
35 35 from appenlight.models.services.base import BaseService
36 36 from paginate_sqlalchemy import SqlalchemyOrmPage
37 37 from pyramid.threadlocal import get_current_registry
38 38
39 39 log = logging.getLogger(__name__)
40 40
41 41 GroupOccurence = namedtuple('GroupOccurence', ['occurences', 'group'])
42 42
43 43
44 44 class UserService(BaseService):
45 45 @classmethod
46 46 def all(cls, db_session=None):
47 47 return get_db_session(db_session).query(User).order_by(User.user_name)
48 48
49 49 @classmethod
50 50 def send_email(cls, request, recipients, variables, template,
51 51 immediately=False, silent=False):
52 52 html = pyramid.renderers.render(template, variables, request)
53 53 title = variables.get('email_title',
54 54 variables.get('title', "No Title"))
55 55 title = title.replace('\r', '').replace('\n', '')
56 56 sender = "{} <{}>".format(
57 57 request.registry.settings['mailing.from_name'],
58 58 request.registry.settings['mailing.from_email'])
59 59 message = pyramid_mailer.message.Message(
60 60 subject=title, sender=sender, recipients=recipients, html=html)
61 61 if immediately:
62 62 try:
63 63 request.registry.mailer.send_immediately(message)
64 64 except Exception as e:
65 65 log.warning('Exception %s' % e)
66 66 if not silent:
67 67 raise
68 68 else:
69 69 request.registry.mailer.send(message)
70 70
71 71 @classmethod
72 72 def get_paginator(cls, page=1, item_count=None, items_per_page=50,
73 73 order_by=None, filter_settings=None,
74 74 exclude_columns=None, db_session=None):
75 75 registry = get_current_registry()
76 76 if not exclude_columns:
77 77 exclude_columns = []
78 78 if not filter_settings:
79 79 filter_settings = {}
80 80 db_session = get_db_session(db_session)
81 81 q = db_session.query(User)
82 82 if filter_settings.get('order_col'):
83 83 order_col = filter_settings.get('order_col')
84 84 if filter_settings.get('order_dir') == 'dsc':
85 85 sort_on = 'desc'
86 86 else:
87 87 sort_on = 'asc'
88 88 q = q.order_by(getattr(sa, sort_on)(getattr(User, order_col)))
89 89 else:
90 90 q = q.order_by(sa.desc(User.registered_date))
91 91 # remove urlgen or it never caches count
92 92 cache_params = dict(filter_settings)
93 93 cache_params.pop('url', None)
94 94 cache_params.pop('url_maker', None)
95 95
96 96 @registry.cache_regions.redis_min_5.cache_on_arguments()
97 97 def estimate_users(cache_key):
98 98 o_q = q.order_by(False)
99 99 return o_q.count()
100 100
101 101 item_count = estimate_users(cache_params)
102 102 # if the number of pages is low we may want to invalidate the count to
103 103 # provide 'real time' update - use case -
104 104 # errors just started to flow in
105 105 if item_count < 1000:
106 106 item_count = estimate_users.refresh(cache_params)
107 107 paginator = SqlalchemyOrmPage(q, page=page,
108 108 item_count=item_count,
109 109 items_per_page=items_per_page,
110 110 **filter_settings)
111 111 return paginator
112 112
113 113 @classmethod
114 114 def get_valid_channels(cls, user):
115 115 return [channel for channel in user.alert_channels
116 116 if channel.channel_validated]
117 117
118 118 @classmethod
119 119 def report_notify(cls, user, request, application, report_groups,
120 occurence_dict, since_when=None, db_session=None):
120 occurence_dict, db_session=None):
121 121 db_session = get_db_session(db_session)
122 122 if not report_groups:
123 123 return True
124
125 if not since_when:
126 124 since_when = datetime.utcnow()
127 125 for channel in cls.get_valid_channels(user):
128 126 confirmed_groups = []
129 127
130 128 for group in report_groups:
131 129 occurences = occurence_dict.get(group.id, 1)
132 130 for action in channel.channel_actions:
133 131 not_matched = (
134 132 action.resource_id and action.resource_id !=
135 133 application.resource_id)
136 134 if action.type != 'report' or not_matched:
137 135 continue
138 136 should_notify = (action.action == 'always' or
139 137 not group.notified)
140 138 rule_obj = Rule(action.rule, REPORT_TYPE_MATRIX)
141 139 report_dict = group.get_report().get_dict(request)
142 140 if rule_obj.match(report_dict) and should_notify:
143 141 item = GroupOccurence(occurences, group)
144 142 if item not in confirmed_groups:
145 143 confirmed_groups.append(item)
146 144
147 145 # send individual reports
148 146 total_confirmed = len(confirmed_groups)
149 147 if not total_confirmed:
150 148 continue
151 149 try:
152 150 channel.notify_reports(resource=application,
153 151 user=user,
154 152 request=request,
155 153 since_when=since_when,
156 154 reports=confirmed_groups)
157 155 except IntegrationException as e:
158 156 log.warning('%s' % e)
@@ -1,176 +1,184 b''
1 1 [app:appenlight]
2 2 use = egg:appenlight
3 3 reload_templates = false
4 4 debug_authorization = false
5 5 debug_notfound = false
6 6 debug_routematch = false
7 7 debug_templates = false
8 8 default_locale_name = en
9 9 sqlalchemy.url = {{ appenlight_dbstring }}
10 10 sqlalchemy.pool_size = 10
11 11 sqlalchemy.max_overflow = 50
12 12 sqlalchemy.echo = false
13 13 jinja2.directories = appenlight:templates
14 14 jinja2.filters = nl2br = appenlight.lib.jinja2_filters.nl2br
15 15
16 16 #includes
17 17 appenlight.includes =
18 18
19 19 #redis
20 20 redis.url = redis://localhost:6379/0
21 21 redis.redlock.url = redis://localhost:6379/3
22 22
23 23 #elasticsearch
24 24 elasticsearch.nodes = http://127.0.0.1:9200
25 25
26 26 #dirs
27 27 webassets.dir = %(here)s/webassets/
28 28
29 29 # encryption
30 30 encryption_secret = {{appenlight_encryption_secret}}
31 31
32 32 #authtkt
33 33 # uncomment if you use SSL
34 34 # authtkt.secure = true
35 35 authtkt.secret = {{appenlight_authtkt_secret}}
36 36 # session settings
37 37 redis.sessions.secret = {{appenlight_redis_session_secret}}
38 38 redis.sessions.timeout = 86400
39 39
40 40 # session cookie settings
41 41 redis.sessions.cookie_name = appenlight
42 42 redis.sessions.cookie_max_age = 2592000
43 43 redis.sessions.cookie_path = /
44 44 redis.sessions.cookie_domain =
45 45 # uncomment if you use SSL
46 46 redis.sessions.cookie_secure = True
47 47 redis.sessions.cookie_httponly = True
48 48 redis.sessions.cookie_on_exception = True
49 49 redis.sessions.prefix = appenlight:session:
50 50
51 51 #cache
52 52 cache.regions = default_term, second, short_term, long_term
53 53 cache.type = ext:memcached
54 54 cache.url = 127.0.0.1:11211
55 55 cache.lock_dir = %(here)s/data/cache/lock
56 56 cache.second.expire = 1
57 57 cache.short_term.expire = 60
58 58 cache.default_term.expire = 300
59 59
60 60 #mailing
61 61 mailing.app_url = https://{{appenlight_domain}}
62 62 mailing.from_name = App Enlight
63 63 mailing.from_email = no-reply@{{appenlight_domain}}
64 64
65 65 ###
66 66 # Authomatic configuration
67 67 ###
68 68
69 69 authomatic.secret =
70 70 authomatic.pr.facebook.app_id =
71 71 authomatic.pr.facebook.secret =
72 72 authomatic.pr.twitter.key =
73 73 authomatic.pr.twitter.secret =
74 74 authomatic.pr.google.key =
75 75 authomatic.pr.google.secret =
76 76 authomatic.pr.github.key =
77 77 authomatic.pr.github.secret =
78 78 authomatic.pr.github.scope =
79 79 authomatic.pr.bitbucket.key =
80 80 authomatic.pr.bitbucket.secret =
81 81
82 82 #ziggurat
83 83 ziggurat_foundations.model_locations.User = appenlight.models.user:User
84 84 ziggurat_foundations.sign_in.username_key = sign_in_user_name
85 85 ziggurat_foundations.sign_in.password_key = sign_in_user_password
86 86 ziggurat_foundations.sign_in.came_from_key = came_from
87 87
88 88 #cometd
89 89 cometd.server = http://127.0.0.1:8088
90 90 cometd.secret = secret
91 91 cometd.ws_url = wss://{{appenlight_domain}}/channelstream
92 92
93 93 # for celery
94 94 appenlight.api_key =
95 95 appenlight.transport_config =
96 96 appenlight.public_api_key =
97 97
98 98 # celery
99 99 celery.broker_type = redis
100 100 celery.broker_url = redis://localhost:6379/3
101 101 celery.concurrency = 2
102 102 celery.timezone = UTC
103 103
104 # tasks
105
106 # how often run alerting tasks (60s default)
107 tasks.notifications_reports.interval = 60
108
104 109 [filter:paste_prefix]
105 110 use = egg:PasteDeploy#prefix
106 111
107 112
108 113 [filter:appenlight_client]
109 114 use = egg:appenlight_client
110 115 appenlight.api_key =
116
117 # appenlight.transport_config = http://127.0.0.1:6543?threaded=1&timeout=5&verify=0
118 # by default uses api.appenlight.com server
111 119 appenlight.transport_config =
112 120 appenlight.report_local_vars = true
113 121 appenlight.report_404 = true
114 122 appenlight.timing.dbapi2_psycopg2 = 0.3
115 123
116 124
117 125
118 126 [pipeline:main]
119 127 pipeline = paste_prefix
120 128 appenlight_client
121 129 appenlight
122 130
123 131
124 132
125 133 [server:main]
126 134 use = egg:gunicorn#main
127 135 host = 0.0.0.0:6543, unix:/tmp/appenlight.sock
128 136 workers = 2
129 137 timeout = 90
130 138 max_requests = 10000
131 139
132 140 [server:api]
133 141 use = egg:gunicorn#main
134 142 host = 0.0.0.0:6553, unix:/tmp/api.appenlight.sock
135 143 workers = 2
136 144 max_requests = 10000
137 145
138 146
139 147 # Begin logging configuration
140 148
141 149 [loggers]
142 150 keys = root, appenlight, sqlalchemy
143 151
144 152 [handlers]
145 153 keys = console
146 154
147 155 [formatters]
148 156 keys = generic
149 157
150 158 [logger_root]
151 159 level = WARN
152 160 handlers = console
153 161
154 162 [logger_appenlight]
155 163 level = WARN
156 164 handlers =
157 165 qualname = appenlight
158 166
159 167 [logger_sqlalchemy]
160 168 level = WARN
161 169 handlers =
162 170 qualname = sqlalchemy.engine
163 171 # "level = INFO" logs SQL queries.
164 172 # "level = DEBUG" logs SQL queries and results.
165 173 # "level = WARN" logs neither. (Recommended for production systems.)
166 174
167 175 [handler_console]
168 176 class = StreamHandler
169 177 args = (sys.stderr,)
170 178 level = NOTSET
171 179 formatter = generic
172 180
173 181 [formatter_generic]
174 182 format = %(asctime)s %(levelname)-5.5s [%(name)s][%(threadName)s] %(message)s
175 183
176 184 # End logging configuration
General Comments 0
You need to be logged in to leave comments. Login now